flink checkpoint 延迟的性能问题讨论

2024-06-16 文章 15868861416
各位大佬,
背景:
实际测试flink读Kafka 数据写入hudi, checkpoint的间隔时间是1min, 
state.backend分别为filesystem,测试结果如下:



写hudi的checkpoint 的延迟





写iceberg得延迟:



疑问: hudi的checkpoint的文件数据比iceberg要大很多,如何降低flink写hudi的checkpoint的延迟?


| |
博星
|
|
15868861...@163.com
|



Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-23 文章 yuanfeng hu


> 2024年1月18日 14:59,fufu  写道:
> 
> 看hdfs上shard文件比chk-xxx要大很多。
> 
> 
> 
> 在 2024-01-18 14:49:14,"fufu"  写道:
> 
> 是datastream作业,窗口算子本身没有设置TTL,其余算子设置了TTL,是在Flink 
> UI上看到窗口算子的size不断增大,一天能增加个600~800M,持续不断的增大。以下图为例:ID为313的cp比ID为304的大了将近10M,一直运行,会一直这么增加下去。cp文件和rocksdb文件正在看~
> 
> 在 2024-01-18 10:56:51,"Zakelly Lan"  写道:
> 
>> 你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
>> TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
>> 
>> On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:
>> 
>>> 
>>> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
>>> https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
>>> 请社区指导下,或者有没有别的解决方案?感谢社区!
Manifest  
文件是会一直增大的,flink没有提供参数给你设置,如果需要设置的话需要实现ConfigurableRocksDBOptionsFactory,在我们的实践中合理设置manifest大小是对checkpoint大小有作用的

关于 flink Async io checkpoint restore

2024-01-23 文章 zhhui yan
HI All
flink 1.18.0 jdk 17 使用异步IO 失败后无法恢复,一直报序列化问题;
我调整使用 string 类型和bytes 都不能够恢复
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(
DefaultOperatorStateBackendBuilder.java:88)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:533)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:280)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.io.IOException: Corrupt stream, found tag: 93
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
.deserialize(StreamElementSerializer.java:201)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
.deserialize(StreamElementSerializer.java:43)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation
.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:231)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(
OperatorStateRestoreOperation.java:201)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(
DefaultOperatorStateBackendBuilder.java:85)
... 17 more
-- 
best with you!
zhhuiyan


Re: Re:Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 文章 Zakelly Lan
图挂了看不到,不然你把文字信息简单复制下来看看?
另外你的ProcessWindowFunction里是否会访问state,如果访问了,是否实现了clear方法?

On Thu, Jan 18, 2024 at 3:01 PM fufu  wrote:

> 看hdfs上shard文件比chk-xxx要大很多。
>
>
>
> 在 2024-01-18 14:49:14,"fufu"  写道:
>
> 是datastream作业,窗口算子本身没有设置TTL,其余算子设置了TTL,是在Flink
> UI上看到窗口算子的size不断增大,一天能增加个600~800M,持续不断的增大。以下图为例:ID为313的cp比ID为304的大了将近10M,一直运行,会一直这么增加下去。cp文件和rocksdb文件正在看~
>
> 在 2024-01-18 10:56:51,"Zakelly Lan"  写道:
>
> >你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
> >TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
> >
> >On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:
> >
> >>
> >>
> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
> >> https://blog.csdn.net/RL_LEEE/article/details/123864487
> ,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
> >> 请社区指导下,或者有没有别的解决方案?感谢社区!
>


Re:Re:Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 文章 fufu
看hdfs上shard文件比chk-xxx要大很多。



在 2024-01-18 14:49:14,"fufu"  写道:

是datastream作业,窗口算子本身没有设置TTL,其余算子设置了TTL,是在Flink 
UI上看到窗口算子的size不断增大,一天能增加个600~800M,持续不断的增大。以下图为例:ID为313的cp比ID为304的大了将近10M,一直运行,会一直这么增加下去。cp文件和rocksdb文件正在看~

在 2024-01-18 10:56:51,"Zakelly Lan"  写道:

>你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
>TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
>
>On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:
>
>>
>> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
>> https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
>> 请社区指导下,或者有没有别的解决方案?感谢社区!


Re:Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 文章 fufu
是datastream作业,窗口算子本身没有设置TTL,其余算子设置了TTL,是在Flink 
UI上看到窗口算子的size不断增大,一天能增加个600~800M,持续不断的增大。以下图为例:ID为313的cp比ID为304的大了将近10M,一直运行,会一直这么增加下去。cp文件和rocksdb文件正在看~

在 2024-01-18 10:56:51,"Zakelly Lan"  写道:

>你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
>TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
>
>On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:
>
>>
>> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
>> https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
>> 请社区指导下,或者有没有别的解决方案?感谢社区!


Re: RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 文章 Zakelly Lan
你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大

On Wed, Jan 17, 2024 at 4:09 PM fufu  wrote:

>
> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:
> https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
> 请社区指导下,或者有没有别的解决方案?感谢社区!


RocksDB增量模式checkpoint大小持续增长的问题

2024-01-17 文章 fufu
我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
 请社区指导下,或者有没有别的解决方案?感谢社区!

回复: flink-checkpoint 问题

2024-01-11 文章 吴先生
看现象是这样,谢了,我抽空看下这块源码


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月11日 16:33 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
看了下代码,这个问题有可能的原因是:
1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log
的,所以有概率是目录创建了,但是log没输出trigger
2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能是先执行了创建25548目录的操作然后作业再失败,然后trigger
25548还没输出就退了。

版本1.14.5之后代码已经把上述1行为改了,先打log再创建目录,就不会有这样奇怪的问题了。



On Thu, Jan 11, 2024 at 3:03 PM 吴先生 <15951914...@163.com> wrote:

TM日志:
2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
and sending final execution state CANCELED to JobManager for task
ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0
e960208bbd95b1b219bafe4887b48392.
2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR
o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered
error while consuming partitions
java.nio.channels.ClosedChannelException: null
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)


JM日志,没有25548的触发记录:
2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347

Re: flink-checkpoint 问题

2024-01-11 文章 Zakelly Lan
看了下代码,这个问题有可能的原因是:
1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log
的,所以有概率是目录创建了,但是log没输出trigger
2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能是先执行了创建25548目录的操作然后作业再失败,然后trigger
25548还没输出就退了。

版本1.14.5之后代码已经把上述1行为改了,先打log再创建目录,就不会有这样奇怪的问题了。



On Thu, Jan 11, 2024 at 3:03 PM 吴先生 <15951914...@163.com> wrote:

> TM日志:
> 2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
> and sending final execution state CANCELED to JobManager for task
> ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0
> e960208bbd95b1b219bafe4887b48392.
> 2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR
> o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered
> error while consuming partitions
> java.nio.channels.ClosedChannelException: null
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
>  at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  at java.lang.Thread.run(Thread.java:

回复: flink-checkpoint 问题

2024-01-10 文章 吴先生
TM日志:
2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and 
sending final execution state CANCELED to JobManager for task 
ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0 
e960208bbd95b1b219bafe4887b48392.
2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR 
o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error 
while consuming partitions
java.nio.channels.ClosedChannelException: null
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
 at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 at java.lang.Thread.run(Thread.java:748)


JM日志,没有25548的触发记录:
2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347921 bytes in 
50128 ms).
2023-12-31 18:40:10.681 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 25547 (type=CHECKPOINT) @ 1704019210665 for job 
d12f3c6e836f56fb23d96e31737ff0b3.
2023-12-31 18:50:10.681 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 25547 of 
job d12f3c6e836f56fb23d96e31737ff

Re:回复: flink-checkpoint 问题

2024-01-10 文章 Xuyang
Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。




--

Best!
Xuyang




在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道:

JM中chk失败时间点日志,没有25548的触发记录:


自动recovery失败:


TM日志:


checkpoint文件路径,25548里面空的:


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月10日 18:20 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

Flink版本: 1.12
checkpoint配置:hdfs

现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




回复: flink-checkpoint 问题

2024-01-10 文章 吴先生
JM中chk失败时间点日志,没有25548的触发记录:


自动recovery失败:


TM日志:


checkpoint文件路径,25548里面空的:


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月10日 18:20 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

Flink版本: 1.12
checkpoint配置:hdfs

现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




Re:flink-checkpoint 问题

2024-01-10 文章 ouywl
我记得flink低版本有这个bug,会错误的删除某一个checkpoint的,你这个版本太老了,可以升级到新版本。


The following is the content of the forwarded email
From:"吴先生" <15951914...@163.com>
To:user-zh 
Date:2024-01-10 17:54:42
Subject:flink-checkpoint 问题

Flink版本: 1.12
checkpoint配置:hdfs
现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




Re: flink-checkpoint 问题

2024-01-10 文章 Zakelly Lan
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

> Flink版本: 1.12
> checkpoint配置:hdfs
>
> 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
>
>


flink-checkpoint 问题

2024-01-10 文章 吴先生
Flink版本: 1.12
checkpoint配置:hdfs
现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的



在使用使用jemalloc内存分配器一段时间后,出现checkpoint 超时,任务卡住的情况

2023-09-24 文章 rui chen
在使用使用jemalloc内存分配器一段时间后,出现checkpoint
超时,任务卡住的情况,哪位遇到过呢?flink版本:flink-1.13.2,jiemalloc版本:5.3.0


After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-24 文章 rui chen
After using the jemalloc memory allocator for a period of time, checkpoint
timeout occurs and tasks are stuck. Who has encountered this? flink
version:1.13.2, jiemalloc version: 5.3.0


checkpoint原理和2pc原理

2023-08-20 文章 zyzandmz


问题一:
做checkpoint时,是每个算子收到barriers之后,将状态和offset写到状态后端,并返回ack给jm之后。再做一次全量快照到jm内存或者自己设置的hdfs文件路径下啊。不理解在hdfs生成的checkpoint文件到底是2pc提交事务成功之后的checkpoint还是每个算子做完checkpoint。

是图1:


还是图二:


问题二:
做完2pc之后。出现了故障。做故障恢复。恢复的状态是上一次提交事务成功的地方的状态。还是上一个barriers所在算子做的checkpoint成功的地方开始恢复。


| |
zyzandmz
|
|
zyzan...@163.com
|

Re: checkpoint Kafka Offset commit failed

2023-05-04 文章 Shammon FY
Hi

看起来像是网络问题导致flink作业source节点连接kafka失败,可以检查一下kafka集群的网络或者flink作业source节点的网络是否有问题

Best,
Shammon FY

On Fri, May 5, 2023 at 9:41 AM Leonard Xu  wrote:

> 可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org   取消订阅来自
> user-zh@flink.apache.org  邮件列表的邮件,邮件列表的订阅管理,可以参考[1]
>
> 祝好,
> Leonard
> [1]
> https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
>
> > 2023年5月4日 下午9:00,wuzhongxiu  写道:
> >
> > 退订
> >
> >
> >
> > | |
> > go574...@163.com
> > |
> > |
> > 邮箱:go574...@163.com
> > |
> >
> >
> >
> >
> >  回复的原邮件 
> > | 发件人 | zhan...@eastcom-sw.com |
> > | 日期 | 2023年05月04日 14:54 |
> > | 收件人 | user-zh |
> > | 抄送至 | |
> > | 主题 | checkpoint Kafka Offset commit failed |
> > hi,请问在flink(1.14、1.16) checkpoint(10s)提交 kafka偏移量提示 The coordinator is
> not available
> >
> > 查看kafka集群日志都是正常的,手动也可以正确提交偏移量,重启flink
> job后也可以正常提交,运行一段时间后又会失败,请问有参数可以优化一下吗?
> >
> > flink 日志如下:
> > 2023-05-04 11:31:02,636 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] -
> Failed to commit consumer offsets for checkpoint 69153
> > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets.
> > Caused by:
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The
> coordinator is not available.
>
>


Re: checkpoint Kafka Offset commit failed

2023-05-04 文章 Leonard Xu
可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org   取消订阅来自 
user-zh@flink.apache.org  邮件列表的邮件,邮件列表的订阅管理,可以参考[1]

祝好,
Leonard
[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

> 2023年5月4日 下午9:00,wuzhongxiu  写道:
> 
> 退订
> 
> 
> 
> | |
> go574...@163.com
> |
> |
> 邮箱:go574...@163.com
> |
> 
> 
> 
> 
>  回复的原邮件 
> | 发件人 | zhan...@eastcom-sw.com |
> | 日期 | 2023年05月04日 14:54 |
> | 收件人 | user-zh |
> | 抄送至 | |
> | 主题 | checkpoint Kafka Offset commit failed |
> hi,请问在flink(1.14、1.16) checkpoint(10s)提交 kafka偏移量提示 The coordinator is not 
> available  
> 
> 查看kafka集群日志都是正常的,手动也可以正确提交偏移量,重启flink job后也可以正常提交,运行一段时间后又会失败,请问有参数可以优化一下吗?
> 
> flink 日志如下:
> 2023-05-04 11:31:02,636 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 69153
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.



回复:checkpoint Kafka Offset commit failed

2023-05-04 文章 wuzhongxiu
退订



| |
go574...@163.com
|
|
邮箱:go574...@163.com
|




 回复的原邮件 
| 发件人 | zhan...@eastcom-sw.com |
| 日期 | 2023年05月04日 14:54 |
| 收件人 | user-zh |
| 抄送至 | |
| 主题 | checkpoint Kafka Offset commit failed |
hi,请问在flink(1.14、1.16) checkpoint(10s)提交 kafka偏移量提示 The coordinator is not 
available  

查看kafka集群日志都是正常的,手动也可以正确提交偏移量,重启flink job后也可以正常提交,运行一段时间后又会失败,请问有参数可以优化一下吗?

flink 日志如下:
2023-05-04 11:31:02,636 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 69153
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.


Re: Flink 误报checkpoint失败

2023-05-03 文章 Yanfei Lei
hi, 扩缩容会重启作业,在作业重启期间,job manager 先启动了,还有部分task manager没启动就有可能报“Not all
required tasks are currently
running..”的错误,作业的所有task完全启动后这个错误就会消失。

Best,
Yanfei
Chen Yang  于2023年5月4日周四 09:44写道:
>
> 您好,
>
> 我的 Flink job是以 reactive 模式运行,然后用了 Kubernetes HPA 来自动扩容/缩容
> TaskManager。每当TaskManager
> 扩容/缩容的时候,Flink会在日志中报错:因为扩缩容之前的TaskManager没有在运行导致checkpoint失败,同时也有checkpoint失败的警报。
> 但实际上checkpoint 还能顺利进行, job也没有运行错误。 重启job后这个错误就会消失。想请教一下如何修复这个问题?
>
> 详细的日志如下
> 2022-12-13 05:08:22.339 [jobmanager-io-thread-1] INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 40393 for job  (488170 bytes,
> checkpointDuration=2582 ms, finalizationTime=322 ms).
> 2022-12-13 05:08:28.083 [Checkpoint Timer] INFO
>  org.apache.flink.runtime.checkpoint.CheckpointFailureManager  - Failed to
> trigger checkpoint for job 0000 since
> Checkpoint triggering task Source: Custom Source -> Sink: Unnamed (1/79) of
> job  is not being executed at the moment.
> Aborting checkpoint. Failure reason: Not all required tasks are currently
> running..
> 2022-12-13 05:09:19.437 [Checkpoint Timer] INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 40394 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1670908159435 for job
> .
> 2022-12-13 05:09:25.208 [jobmanager-io-thread-1] INFO
>  org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
> flink-ingest-sps-nv-consumer/2022-11-15T01:10:30Z//chk-40394/_metadata
> with MPU ID
> _3vKXSVBMuBM7207EpGvCXOTRQskAiPPj88DSTTn55Uzuc_76dnubmTAPBovyWbKBKU8Wxqz6SuFBJ8cZnAOH_PkGEP36KJzMFYYPmT.xZvmLnM.YX1oJSHN3VP1TXpJECY8y80psYvRWvbt2e8CMeoa9JiOWiGYGRmqLGRdlQA-
> 2022-12-13 05:09:25.747 [jobmanager-io-thread-1] INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 40394 for job  (482850 bytes,
> checkpointDuration=5982 ms, finalizationTime=330 ms).
>
> Thanks,
> Chen


Flink 误报checkpoint失败

2023-05-03 文章 Chen Yang
您好,

我的 Flink job是以 reactive 模式运行,然后用了 Kubernetes HPA 来自动扩容/缩容
TaskManager。每当TaskManager
扩容/缩容的时候,Flink会在日志中报错:因为扩缩容之前的TaskManager没有在运行导致checkpoint失败,同时也有checkpoint失败的警报。
但实际上checkpoint 还能顺利进行, job也没有运行错误。 重启job后这个错误就会消失。想请教一下如何修复这个问题?

详细的日志如下
2022-12-13 05:08:22.339 [jobmanager-io-thread-1] INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 40393 for job  (488170 bytes,
checkpointDuration=2582 ms, finalizationTime=322 ms).
2022-12-13 05:08:28.083 [Checkpoint Timer] INFO
 org.apache.flink.runtime.checkpoint.CheckpointFailureManager  - Failed to
trigger checkpoint for job  since
Checkpoint triggering task Source: Custom Source -> Sink: Unnamed (1/79) of
job  is not being executed at the moment.
Aborting checkpoint. Failure reason: Not all required tasks are currently
running..
2022-12-13 05:09:19.437 [Checkpoint Timer] INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 40394 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1670908159435 for job
.
2022-12-13 05:09:25.208 [jobmanager-io-thread-1] INFO
 org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
flink-ingest-sps-nv-consumer/2022-11-15T01:10:30Z//chk-40394/_metadata
with MPU ID
_3vKXSVBMuBM7207EpGvCXOTRQskAiPPj88DSTTn55Uzuc_76dnubmTAPBovyWbKBKU8Wxqz6SuFBJ8cZnAOH_PkGEP36KJzMFYYPmT.xZvmLnM.YX1oJSHN3VP1TXpJECY8y80psYvRWvbt2e8CMeoa9JiOWiGYGRmqLGRdlQA-
2022-12-13 05:09:25.747 [jobmanager-io-thread-1] INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 40394 for job  (482850 bytes,
checkpointDuration=5982 ms, finalizationTime=330 ms).

Thanks,
Chen


Re: checkpoint首次触发时间疑问

2023-01-04 文章 Jiangang Liu
首次快照时间是一个随机数,计算逻辑如下。你可以尝试设置下execution.checkpointing.min-pause

private long getRandomInitDelay() {
return ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpoints,
baseInterval + 1L);
}


aiden <18765295...@163.com> 于2023年1月5日周四 14:06写道:

> hello,我们需求是要将kafka数据落到hive中,为了避免hive小文件问题,采用了一小时checkpoint,具体参数为:
>
> bsEnv.enableCheckpointing(360);
> bsEnv.setStateBackend(new HashMapStateBackend());
>
> bsEnv.getCheckpointConfig().setCheckpointStorage("hdfs://xxx/user/flink/checkpoint/serverlog/hc");
>
> 但在执行时发现首次checkpoint并没有按照一小时触发.
> 作业执行时间为:2023-01-05 10:04:17,但首次checkpoint时间为:2023-01-05 10:34:19
> checkpoint首次触发时间是和什么有关?以及通过什么参数可以设置吗?
>


Re: flink on k8s 提交作业,使用 oss 作为 checkpoint 地址,但找不到 oss

2022-11-07 文章 Lijie Wang
 flink-oss-fs-hadoop-1.13.6.jar 这个 jar 需要放到  flink 的 lib 目录下

Best,
Lijie

highfei2011  于2022年11月1日周二 16:23写道:

> 包冲突了。
>
>
> 在 2022年11月1日 15:39,highfei2011 写道:
>
>
> flink 版本:apache flink 1.13.6 flink operator 版本: 1.2.0
> 提交命令:kubernetes-jobmanager.sh kubernetes-application 异常: Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'oss'. The scheme is directly
> supported by Flink through t he following plugin: flink-oss-fs-hadoop.
> Please ensure that each plugin resides within its own subfolder within the
> plugins directory. See https://ci.apache.org/projects/flink/flink-docs
> -stable/ops/plugins.html for more information. If you want to use a Hadoop
> file system for that scheme, please add the scheme to the configuration
> fs.allowed-fallback-filesystems. For a f ull list of supported file
> systems, please see
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
> 我排查了 /opt/flink/opt/目录,下面是有 flink-oss-fs-hadoop-1.13.6.jar 注:本地测试正常,仅使用
> flink operator 提交时,发生如上异常。


回复: Re:flink exactly once 写kafka,如果checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?

2022-11-02 文章 郑 致远
你好. 但是下一次cp发起之时,   kafka transaction 已经超时失败了,  sink端precommit之前,写入到kafka的数据, 
是不是就丢失了?

发件人: Xuyang 
发送时间: 2022年11月1日 23:08
收件人: user-zh@flink.apache.org 
主题: Re:flink exactly once 
写kafka,如果checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?

Hi, 应该会等到下一次做cp的时候再提交
在 2022-11-01 17:13:22,"郑 致远"  写道:
>大佬们好.
>flink exactly once 写kafka,如果flink 
>checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?
>kafka的transaction因为超时,abort后, 会导致abort之前写kafka的数据,丢失吗?


Re:flink exactly once 写kafka,如果checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?

2022-11-01 文章 Xuyang
Hi, 应该会等到下一次做cp的时候再提交
在 2022-11-01 17:13:22,"郑 致远"  写道:
>大佬们好.
>flink exactly once 写kafka,如果flink 
>checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?
>kafka的transaction因为超时,abort后, 会导致abort之前写kafka的数据,丢失吗?


flink exactly once 写kafka,如果checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?

2022-11-01 文章 郑 致远
大佬们好.
flink exactly once 写kafka,如果flink 
checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?
kafka的transaction因为超时,abort后, 会导致abort之前写kafka的数据,丢失吗?


Re: 并行度>1时实时写入hive partition table且开启了checkpoint没有同步信息到metastore

2022-09-22 文章 yuxia
那么生成了 success 文件了吗?
另外你的 sink.partition-commit.trigger 用的是 process-time(默认是 process-time) 还是 
partition-time。

Best regards,
Yuxia

- 原始邮件 -
发件人: "junjie miao" 
收件人: "user-zh" 
发送时间: 星期四, 2022年 9 月 22日 下午 2:27:46
主题: Re: Re: 并行度>1时实时写入hive partition table且开启了checkpoint没有同步信息到metastore

文件都是生成了的,主要是没有更新hive metastore。

补充下信息已经设置了'sink.partition-commit.policy.kind' = 'metastore,success-file'

 
发件人: yuxia
发送时间: 2022-09-22 14:14
收件人: user-zh
主题: Re: 并行度>1时实时写入hive partition table且开启了checkpoint没有同步信息到metastore
你用 hdfs dfs -ls 看一下对应表的路径下,是不是有文件生成。
 
Best regards,
Yuxia
 
- 原始邮件 -
发件人: "junjie miao" 
收件人: "user-zh" 
发送时间: 星期四, 2022年 9 月 22日 下午 1:59:55
主题: 并行度>1时实时写入hive partition table且开启了checkpoint没有同步信息到metastore
 
flink 1.14.5中消费kafka数据实时写入hive partition text table且开启了checkpoint,
发现当并行度为1时在hive中show partitions可以看到分区信息并能查询出结果数据,
但是当并行度>1时就查询不到分区信息同时也无法查询出结果数据。
不知道是否有人也遇到过这个问题,还是有什么其他注意点?


Re: 线上flink任务突然出现连续的checkpoint失败

2022-06-23 文章 Lijie Wang
-> Caused by: org.apache.flink.util.SerializedThrowable: Unable to close
file because the last
blockBP-1965840142-10.216.138.23-1585685654447:blk_2926076096_1852445656
does not have enough number of replicas.

从错误看是写 hdfs 的问题,建议看下 hdfs 是否正常

Best,
Lijie

陈卓宇 <2572805...@qq.com.invalid> 于2022年6月24日周五 12:00写道:

> flink版本:1.13.1
> hdfs:3+版本
> 异常日志:
>
> 2022-06-24 10:58:19,839 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline
> checkpoint 1101 by task b3d88f9ef72bda003056856c4422742d of job
> 6bd7dc46451f01e008762c9b556cb08f at zhaohy4-test-taskmanager-1-1 @
> 10.42.5.55 (dataPort=40558).
>
> org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint
> failed.
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:279)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:175)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_202]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_202]
>
> at java.lang.Thread.run(Thread.java:748)
> [?:1.8.0_202]
>
> Caused by: org.apache.flink.util.SerializedThrowable: Could not
> materialize checkpoint 1101 for operator IntervalJoin(joinType=[InnerJoin],
> windowBounds=[isRowTime=true, leftLowerBound=-129600,
> leftUpperBound=129600, leftTimeIndex=4, rightTimeIndex=4],
> where=[((hire_contract_id = id) AND (last_modify_time =
> (last_modify_time0 - 129600:INTERVAL DAY)) AND (last_modify_time <=
> (last_modify_time0 + 129600:INTERVAL DAY)))], select=[hire_contract_id,
> hire_status_code, sign_date, confirm_date, last_modify_time, proctime, id,
> hire_contract_code, ziroom_version_id, is_del, last_modify_time0]) -
> Calc(select=[hire_contract_id, hire_status_code, sign_date, confirm_date,
> last_modify_time, proctime, hire_contract_code, ziroom_version_id, is_del
> AS is_del0, last_modify_time0]) (1/1)#3.
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> ... 4 more
>
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException:
> Could not flush to file and close the file system output stream to
> hdfs://zrHdfsHa/user/flink/checkpointsdata/6bd7dc46451f01e008762c9b556cb08f/shared/5a5118ba-427f-4234-8e36-ec8d24418fe4
> in order to obtain the stream state handle
>
> at
> java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_202]
>
> at
> java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_202]
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer. ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:128)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> ... 3 more
>
> Caused by: org.apache.flink.util.SerializedThrowable: Could not flush to
> file and close the file system output stream to
> hdfs://zrHdfsHa/user/flink/checkpointsdata/6bd7dc46451f01e008762c9b556cb08f/shared/5a5118ba-427f-4234-8e36-ec8d24418fe4
> in order to obtain the stream state handle
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:373)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:143)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:101)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
> ~[flink-core-1.13.1.jar:1.13.1]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ~[?:1.8.0_202]
>
> ... 3 more
>
> Caused by: org.apache.flink.util.SerializedThrowable: Unable to close file
> because the last
> blockBP-1965840142-10.216.138.23-1585685654447:blk_2926076096_1852445656
> does not have enough number of replicas.
>
> at
> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:966)
> ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-

????flink??????????????????checkpoint????

2022-06-23 文章 ??????
flink??1.13.1
hdfs??3+
??

2022-06-24 10:58:19,839 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline 
checkpoint 1101 by task b3d88f9ef72bda003056856c4422742d of job 
6bd7dc46451f01e008762c9b556cb08f at zhaohy4-test-taskmanager-1-1 @ 10.42.5.55 
(dataPort=40558).

org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint failed.

at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:279)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]

at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:175)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_202]

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_202]

at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]

Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize 
checkpoint 1101 for operator IntervalJoin(joinType=[InnerJoin], 
windowBounds=[isRowTime=true, leftLowerBound=-129600, 
leftUpperBound=129600, leftTimeIndex=4, rightTimeIndex=4], 
where=[((hire_contract_id = id) AND (last_modify_time = (last_modify_time0 
- 129600:INTERVAL DAY)) AND (last_modify_time <= (last_modify_time0 + 
129600:INTERVAL DAY)))], select=[hire_contract_id, hire_status_code, 
sign_date, confirm_date, last_modify_time, proctime, id, hire_contract_code, 
ziroom_version_id, is_del, last_modify_time0]) - 
Calc(select=[hire_contract_id, hire_status_code, sign_date, confirm_date, 
last_modify_time, proctime, hire_contract_code, ziroom_version_id, is_del AS 
is_del0, last_modify_time0]) (1/1)#3.

at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]

... 4 more

Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
Could not flush to file and close the file system output stream to 
hdfs://zrHdfsHa/user/flink/checkpointsdata/6bd7dc46451f01e008762c9b556cb08f/shared/5a5118ba-427f-4234-8e36-ec8d24418fe4
 in order to obtain the stream state handle

at 
java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_202]

at 
java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_202]

at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]

at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.

Re: Re: 使用join+聚合时,checkpoint异常

2022-06-21 文章 Lincoln Lee
Hi,
   确认了下, cdc source 目前全量结束后 task 还是保持的,不会 finish, 这里的 finished task 应该是你提到的
" 使用了lookup join + 外部mysql维表,任务开始时,全量加载了一次维表数据,对应task状态就变成了finished。"

Best,
Lincoln Lee


amber_...@qq.com.INVALID  于2022年6月21日周二 14:35写道:

> 非常感谢!你的建议很有用。
>
> 我在代码中添加execution.checkpointing.checkpoints-after-tasks-finish.enabled相关配置,完美解决了问题。
> 我使用了lookup join + 外部mysql维表,任务开始时,全量加载了一次维表数据,对应task状态就变成了finished。
>
> best wishes!
>
>
> amber_...@qq.com
>
> 发件人: Lincoln Lee
> 发送时间: 2022-06-21 11:18
> 收件人: user-zh
> 主题: Re: Re: 使用join+聚合时,checkpoint异常
> Hi,
>   从描述来看, 因为使用了 cdc source (猜测是先 全量 后增量同步),  全量阶段完成时对应的 task 会到达 finished
> 状态, 在 1.14 版本中, 对应的配置项 `
> execution.checkpointing.checkpoints-after-tasks-finish.enabled` 默认值是关闭的
> (1.15+ 版本默认会开启), 可以开启或升级到 1.15 版本后再观察下
>
> > because Some tasks of the job have already finished and checkpointing
> with finished tasks is not enabled
>
> Best,
> Lincoln Lee
>
>
> amber_...@qq.com.INVALID  于2022年6月21日周二 10:27写道:
>
> > 感谢!
> > 未发生背压,但我在日志中发现了一些异常信息,如下:
> > Failed to trigger checkpoint for job 297c5a840f8fd3a1cbcb63825200e8d4
> > because Some tasks of the job have already finished and checkpointing
> with
> > finished tasks is not enabled. Failure reason: Not all required tasks are
> > currently running.
> >
> > 通过web ui可以看到,确实有一部分任务是finished状态。
> >
> > 是否因为我关联多张维表的时候,同时使用了lookup join和普通join呢?
> > --
> > amber_...@qq.com
> >
> >
> > *发件人:* Shengkai Fang 
> > *发送时间:* 2022-06-21 09:53
> > *收件人:* user-zh 
> > *主题:* Re: 使用join+聚合时,checkpoint异常
> > hi.
> >
> > 这种情况下,最好查看一下是否发生了反压,同时看看日志之中是否有相关的异常信息。
> >
> > Best,
> > Shengkai
> >
> > amber_...@qq.com.INVALID  于2022年6月21日周二
> 09:43写道:
> >
> > > 您好!
> > > 我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
> > > 当我提交普通数据同步任务时,一切正常;
> > > 当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed
> > > Memory使用率始终是100%;
> > > 以下是我的checkpoint配置:
> > >
> > >
> > > 我尝试增加Task Managed内存,但使用率总是100%;
> > > 当我关闭增量检查点时,无任何变化;
> > > 当我将State Backend切换为hashmap时,Managed
> Memory使用率回归正常,但checkpoint仍然无法工作;
> > >
> > > 期待你的回复。
> > > 祝好!
> > > --
> > > amber_...@qq.com
> > >
> >
> >
>


Re: Re: 使用join+聚合时,checkpoint异常

2022-06-21 文章 amber_...@qq.com.INVALID
非常感谢!你的建议很有用。

我在代码中添加execution.checkpointing.checkpoints-after-tasks-finish.enabled相关配置,完美解决了问题。
我使用了lookup join + 外部mysql维表,任务开始时,全量加载了一次维表数据,对应task状态就变成了finished。

best wishes!


amber_...@qq.com
 
发件人: Lincoln Lee
发送时间: 2022-06-21 11:18
收件人: user-zh
主题: Re: Re: 使用join+聚合时,checkpoint异常
Hi,
  从描述来看, 因为使用了 cdc source (猜测是先 全量 后增量同步),  全量阶段完成时对应的 task 会到达 finished
状态, 在 1.14 版本中, 对应的配置项 `
execution.checkpointing.checkpoints-after-tasks-finish.enabled` 默认值是关闭的
(1.15+ 版本默认会开启), 可以开启或升级到 1.15 版本后再观察下
 
> because Some tasks of the job have already finished and checkpointing
with finished tasks is not enabled
 
Best,
Lincoln Lee
 
 
amber_...@qq.com.INVALID  于2022年6月21日周二 10:27写道:
 
> 感谢!
> 未发生背压,但我在日志中发现了一些异常信息,如下:
> Failed to trigger checkpoint for job 297c5a840f8fd3a1cbcb63825200e8d4
> because Some tasks of the job have already finished and checkpointing with
> finished tasks is not enabled. Failure reason: Not all required tasks are
> currently running.
>
> 通过web ui可以看到,确实有一部分任务是finished状态。
>
> 是否因为我关联多张维表的时候,同时使用了lookup join和普通join呢?
> --
> amber_...@qq.com
>
>
> *发件人:* Shengkai Fang 
> *发送时间:* 2022-06-21 09:53
> *收件人:* user-zh 
> *主题:* Re: 使用join+聚合时,checkpoint异常
> hi.
>
> 这种情况下,最好查看一下是否发生了反压,同时看看日志之中是否有相关的异常信息。
>
> Best,
> Shengkai
>
> amber_...@qq.com.INVALID  于2022年6月21日周二 09:43写道:
>
> > 您好!
> > 我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
> > 当我提交普通数据同步任务时,一切正常;
> > 当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed
> > Memory使用率始终是100%;
> > 以下是我的checkpoint配置:
> >
> >
> > 我尝试增加Task Managed内存,但使用率总是100%;
> > 当我关闭增量检查点时,无任何变化;
> > 当我将State Backend切换为hashmap时,Managed Memory使用率回归正常,但checkpoint仍然无法工作;
> >
> > 期待你的回复。
> > 祝好!
> > --
> > amber_...@qq.com
> >
>
>


Re: Re: 使用join+聚合时,checkpoint异常

2022-06-20 文章 Lincoln Lee
Hi,
  从描述来看, 因为使用了 cdc source (猜测是先 全量 后增量同步),  全量阶段完成时对应的 task 会到达 finished
状态, 在 1.14 版本中, 对应的配置项 `
execution.checkpointing.checkpoints-after-tasks-finish.enabled` 默认值是关闭的
(1.15+ 版本默认会开启), 可以开启或升级到 1.15 版本后再观察下

> because Some tasks of the job have already finished and checkpointing
with finished tasks is not enabled

Best,
Lincoln Lee


amber_...@qq.com.INVALID  于2022年6月21日周二 10:27写道:

> 感谢!
> 未发生背压,但我在日志中发现了一些异常信息,如下:
> Failed to trigger checkpoint for job 297c5a840f8fd3a1cbcb63825200e8d4
> because Some tasks of the job have already finished and checkpointing with
> finished tasks is not enabled. Failure reason: Not all required tasks are
> currently running.
>
> 通过web ui可以看到,确实有一部分任务是finished状态。
>
> 是否因为我关联多张维表的时候,同时使用了lookup join和普通join呢?
> --
> amber_...@qq.com
>
>
> *发件人:* Shengkai Fang 
> *发送时间:* 2022-06-21 09:53
> *收件人:* user-zh 
> *主题:* Re: 使用join+聚合时,checkpoint异常
> hi.
>
> 这种情况下,最好查看一下是否发生了反压,同时看看日志之中是否有相关的异常信息。
>
> Best,
> Shengkai
>
> amber_...@qq.com.INVALID  于2022年6月21日周二 09:43写道:
>
> > 您好!
> > 我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
> > 当我提交普通数据同步任务时,一切正常;
> > 当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed
> > Memory使用率始终是100%;
> > 以下是我的checkpoint配置:
> >
> >
> > 我尝试增加Task Managed内存,但使用率总是100%;
> > 当我关闭增量检查点时,无任何变化;
> > 当我将State Backend切换为hashmap时,Managed Memory使用率回归正常,但checkpoint仍然无法工作;
> >
> > 期待你的回复。
> > 祝好!
> > --
> > amber_...@qq.com
> >
>
>


Re: Re: 使用join+聚合时,checkpoint异常

2022-06-20 文章 amber_...@qq.com.INVALID
感谢!
未发生背压,但我在日志中发现了一些异常信息,如下:
Failed to trigger checkpoint for job 297c5a840f8fd3a1cbcb63825200e8d4 because 
Some tasks of the job have already finished and checkpointing with finished 
tasks is not enabled. Failure reason: Not all required tasks are currently 
running.

通过web ui可以看到,确实有一部分任务是finished状态。

是否因为我关联多张维表的时候,同时使用了lookup join和普通join呢?


amber_...@qq.com
 
发件人: Shengkai Fang
发送时间: 2022-06-21 09:53
收件人: user-zh
主题: Re: 使用join+聚合时,checkpoint异常
hi.
 
这种情况下,最好查看一下是否发生了反压,同时看看日志之中是否有相关的异常信息。
 
Best,
Shengkai
 
amber_...@qq.com.INVALID  于2022年6月21日周二 09:43写道:
 
> 您好!
> 我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
> 当我提交普通数据同步任务时,一切正常;
> 当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed
> Memory使用率始终是100%;
>     以下是我的checkpoint配置:
>
>
> 我尝试增加Task Managed内存,但使用率总是100%;
> 当我关闭增量检查点时,无任何变化;
> 当我将State Backend切换为hashmap时,Managed Memory使用率回归正常,但checkpoint仍然无法工作;
>
> 期待你的回复。
> 祝好!
> --
> amber_...@qq.com
>


Re:使用join+聚合时,checkpoint异常

2022-06-20 文章 lxk



你好,图片挂了,可以尝试使用图床工具上传图片。













在 2022-06-21 09:42:54,"amber_...@qq.com.INVALID"  写道:

您好!
我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
当我提交普通数据同步任务时,一切正常;
当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed 
Memory使用率始终是100%;
以下是我的checkpoint配置:



我尝试增加Task Managed内存,但使用率总是100%;
当我关闭增量检查点时,无任何变化;
当我将State Backend切换为hashmap时,Managed Memory使用率回归正常,但checkpoint仍然无法工作;


期待你的回复。
祝好!
amber_...@qq.com

Re: 使用join+聚合时,checkpoint异常

2022-06-20 文章 Shengkai Fang
hi.

这种情况下,最好查看一下是否发生了反压,同时看看日志之中是否有相关的异常信息。

Best,
Shengkai

amber_...@qq.com.INVALID  于2022年6月21日周二 09:43写道:

> 您好!
> 我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
> 当我提交普通数据同步任务时,一切正常;
> 当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed
> Memory使用率始终是100%;
>     以下是我的checkpoint配置:
>
>
> 我尝试增加Task Managed内存,但使用率总是100%;
> 当我关闭增量检查点时,无任何变化;
> 当我将State Backend切换为hashmap时,Managed Memory使用率回归正常,但checkpoint仍然无法工作;
>
> 期待你的回复。
> 祝好!
> --
> amber_...@qq.com
>


使用join+聚合时,checkpoint异常

2022-06-20 文章 amber_...@qq.com.INVALID
您好!
我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
当我提交普通数据同步任务时,一切正常;
当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed 
Memory使用率始终是100%;
以下是我的checkpoint配置:


我尝试增加Task Managed内存,但使用率总是100%;
当我关闭增量检查点时,无任何变化;
当我将State Backend切换为hashmap时,Managed Memory使用率回归正常,但checkpoint仍然无法工作;

期待你的回复。
祝好!


amber_...@qq.com


Re: Unaligned Checkpoint

2022-06-12 文章 Zhanghao Chen
你好,

Unaligned checkpoint 是个底层特性,要使用的话只要设置 Flink 参数 
execution.checkpointing.unaligned = true 就行,在 SQL client 中,可以使用 SET "key" = 
"value" 的语法设置 Flink 参数的值。

Unaligned checkpoint 较之 aligned checkpoint 主要的改变在于

  *   unaligned cp 在输入缓冲区收到第一个 cp barrier 
的时候立即触发快照并直接输出至下游;代价是快照需要记录缓冲区中的数据来保证一致性,产生更多 io 并增大 cp 大小。
  *   aligned cp 在算子收到最后一个 cp barrier 完成 barrier 对齐后才触发快照,barrier 对齐期间较早收到 
barrier 的 input channel 会被阻塞,在反压时阻塞时间会显著增加,导致 cp 速度变慢;好处是 barrier 
对齐的过程使得快照不需要记录缓冲等待队列中的数据就可以保证一致性。

Best,
Zhanghao Chen

From: 小昌同学 
Sent: Saturday, June 11, 2022 17:18
To: user-zh@flink.apache.org 
Subject: Unaligned Checkpoint

大佬们可以说说Unaligned Checkpoint的实现吗  看了不少文档 没有太看懂  我如果想在sql里面实现  这个该怎么设置啊  请大佬们指教


| |
小昌同学
|
|
ccc0606fight...@163.com
|


Unaligned Checkpoint

2022-06-11 文章 小昌同学
大佬们可以说说Unaligned Checkpoint的实现吗  看了不少文档 没有太看懂  我如果想在sql里面实现  这个该怎么设置啊  请大佬们指教


| |
小昌同学
|
|
ccc0606fight...@163.com
|

oss checkpoint fail

2022-05-12 文章 json
使用oss 存储checkpoint,做几次checkpoint就会出现下面报错,导致checkpoint失败


Caused by: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.ClientException: 
Connection error due to: Trying to access closed classloader. Please check if 
you store classloaders directly or indirectly in static fields. If the 
stacktrace suggests that the leak occurs in a third party library and cannot be 
fixed immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'.
[ErrorCode]: Unknown
[RequestId]: Unknown
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequestImpl(ServiceClient.java:170)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequest(ServiceClient.java:70)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.send(OSSOperation.java:83)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:145)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:102)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.writeObjectInternal(OSSObjectOperation.java:897)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.putObject(OSSObjectOperation.java:129)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.putObject(OSSClient.java:471)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.putObject(OSSClient.java:455)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.uploadObject(AliyunOSSFileSystemStore.java:414)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSOutputStream.close(AliyunOSSOutputStream.java:87)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
 ~[?:?]
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:131)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:40)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:323)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1182)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
... 10 more
Caused by: java.lang.IllegalStateException: Trying to access closed 
classloader. Please check if you store classloaders directly or indirectly in 
static fields. If the stacktrace suggests that the leak occurs in a third party 
library and cannot be fixed immediately, you can disable this check with the 
configuration 'classloader.check-leaked-classloader'.
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at org.apache.commons.logging.LogFactory$4.run(LogFactory.java:1307) ~[?:?]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_241]
at org.apache.commons.logging.LogFactory.getResources(LogFactory.java:1325) 
~[?:?]
at 
org.apache.commons.logging.LogFactory.getConfigurationFile(LogFactory.java:1403)
 ~[?:?]
at org.apache.commons.logging.LogFactory.getFactory(LogFactory.java:455) ~[?:?]
at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:657) ~[?:?]
at 
org.apache.http.impl.conn.DefaultHttpResponseParser.(DefaultHttpResponseParser.java:59)
 ~[?:?]
at 
org.apache.http.impl.conn.DefaultHttpResponseParserFactory.create(DefaultHttpResponseParserFactory.java:76)
 ~[?:?]
at 
org.apache.http.impl.DefaultBHttpClientConnection.(DefaultBHttpClientConnection.java:99)
 ~[?:?]
at 
org.apache.http.impl.conn.DefaultManagedHttpClientConnection.(DefaultManagedHttpClientConnection.java:74)
 ~[?:?]
at 
org.apache.http.impl.conn.LoggingManagedHttpClientConnection.(LoggingManagedHttpClientConnection.java:66)
 ~[?:?]
at 
org.apache.http.impl.conn.ManagedHttpClientConnectionFactory.create(ManagedHttpClientConnectionFactory.java:127)
 ~[?:?]
at 
org.apache.http.impl.conn.ManagedHttpClientConnectionFactory.create(ManagedHttpClientConnectionFactory.java:57

Re: Flink - 1.11.6 - FsStateBackend没有存储checkpoint

2022-04-30 文章 Arthur Li
打扰了,解决了,原因是因为启动时没有配置savepoint路径。


> 2022年4月30日 12:09,Arthur Li  写道:
> 
> 大家好,
> 
> 
> 我在学习Flink checkpoint时,做了一个示例没有得到期望结果,麻烦帮忙看看是哪里设置有问题。谢谢
> 1. 启动checkpoint
> 2. 设置statebackend为FsStateBackend
> 3. 从socketTextStream读取数据,统计单词个数
>(“hello”, 5), (“world”, 1)
> 4. 通过触发异常,来模拟终止程序
> 5. 重新启动程序,那么启动之后的统计数据的初始值应该是上一次checkpoint 成功存储的值
>(“hello”, 5), (“world”, 1) , 那么再次输入hello, 应该输出(“hello”, 6)
>而在实际输出结果为(“hello”, 1)
> 
> 环境和版本信息
> 1. MacOS - Oracle JDK 1.8
> 2. 版本信息
> 
>   UTF-8
>   1.11.6
>   scala
>   1.8
>   2.12
>   ${target.java.version}
>   ${target.java.version}
>   2.12.1
> 
> 
> 代码
> object RestartStrategyFsStateBackend {
>  def main(args: Array[String]): Unit = {
>val env = StreamExecutionEnvironment.getExecutionEnvironment
> 
>env.enableCheckpointing(1000L) 
>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2))
> 
>val backendPath = 
> "file:///Users/arthur/Documents/Workspace/java/quickstart" +
>  "/flink-spring/src/main/resources/backend.out/restartstrategyv3"
>env.setStateBackend(new FsStateBackend(backendPath))
> 
>// socket数据源
>env.socketTextStream("localhost", 7077)
>  .map(value => {
>if (value == "restart") {
>  throw new RuntimeException("restart is triggered, ps~")
>}
>(value, 1)
>  }
>  )
>  .keyBy(_._1)
>  .sum(1)
>  .print("RestartStrategy")
> 
>env.execute("RestartStrategy")
>  }
> }
> 
> BR.
> Arthur
> 
> 
> 
> 
> 



Flink - 1.11.6 - FsStateBackend没有存储checkpoint

2022-04-29 文章 Arthur Li
大家好,


我在学习Flink checkpoint时,做了一个示例没有得到期望结果,麻烦帮忙看看是哪里设置有问题。谢谢
1. 启动checkpoint
2. 设置statebackend为FsStateBackend
3. 从socketTextStream读取数据,统计单词个数
(“hello”, 5), (“world”, 1)
4. 通过触发异常,来模拟终止程序
5. 重新启动程序,那么启动之后的统计数据的初始值应该是上一次checkpoint 成功存储的值
(“hello”, 5), (“world”, 1) , 那么再次输入hello, 应该输出(“hello”, 6)
而在实际输出结果为(“hello”, 1)

环境和版本信息
1. MacOS - Oracle JDK 1.8
2. 版本信息

   UTF-8
   1.11.6
   scala
   1.8
   2.12
   ${target.java.version}
   ${target.java.version}
   2.12.1


代码
object RestartStrategyFsStateBackend {
  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.enableCheckpointing(1000L) 
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2))

val backendPath = 
"file:///Users/arthur/Documents/Workspace/java/quickstart" +
  "/flink-spring/src/main/resources/backend.out/restartstrategyv3"
env.setStateBackend(new FsStateBackend(backendPath))

// socket数据源
env.socketTextStream("localhost", 7077)
  .map(value => {
if (value == "restart") {
  throw new RuntimeException("restart is triggered, ps~")
}
(value, 1)
  }
  )
  .keyBy(_._1)
  .sum(1)
  .print("RestartStrategy")

env.execute("RestartStrategy")
  }
}

BR.
Arthur





 

Re: Re:flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败

2022-03-08 文章 Yun Tang
Hi

一般是卡在最后一步从JM写checkpoint meta上面了,建议使用jstack等工具检查一下JM的cpu栈,看问题出在哪里。


祝好
唐云

From: Sun.Zhu <17626017...@163.com>
Sent: Tuesday, March 8, 2022 14:12
To: user-zh@flink.apache.org 
Subject: Re:flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败

图挂了

https://postimg.cc/Z9XdxwSk













在 2022-03-08 14:05:39,"Sun.Zhu" <17626017...@163.com> 写道:

hi all,
flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?








Re:flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败

2022-03-07 文章 Sun.Zhu
图挂了

https://postimg.cc/Z9XdxwSk













在 2022-03-08 14:05:39,"Sun.Zhu" <17626017...@163.com> 写道:

hi all,
flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?





 

flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败

2022-03-07 文章 Sun.Zhu
hi all,
flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?

Re: flink 反压导致checkpoint超时,从而导致任务失败问题

2022-03-03 文章 yu'an huang
你好,我检查了下关于checkpoint的文档:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/
 
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/>
tolerable checkpoint failure number: This defines how many consecutive 
checkpoint failures will be tolerated, before the whole job is failed over. The 
default value is 0, which means no checkpoint failures will be tolerated, and 
the job will fail on first reported checkpoint failure.

可以为作业设置容忍checkpoint失败的, 你可以像文档中说加下相关设置:
// only two consecutive checkpoint failures are tolerated
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
希望可以帮到你




> On 4 Mar 2022, at 10:07 AM, yu'an huang  <mailto:h.yuan...@gmail.com>> wrote:
> 
> 你好,checkpoint超时默认不会导致作业重启,可以提供下JM log看看作业为什么会重启吗?
> 
>> On 3 Mar 2022, at 9:15 PM, kong <62...@163.com <mailto:62...@163.com>> wrote:
>> 
>> hello,我最近遇到一个问题:
>> 我通过flink消费kafka数据,job 图大概是这样的:Source -> map -> filter -> flatMap -> Map -> 
>> Sink
>> 在一瞬间kafka的producer端会产生大量的数据,导致flink无法消费完,我的checkpoint设置的是10分钟;
>> 最后会产生Checkpoint expired before 
>> completing.的错误,导致job重启,从而导致从上一个checkpoint恢复,然后重复消费数据,又导致checkpoint超时,死循环了。
>> 
>> 
>> 不知道有什么好办法解决该问题。
>> 多谢~
>> 
> 



Re: flink 反压导致checkpoint超时,从而导致任务失败问题

2022-03-03 文章 yu'an huang
你好,checkpoint超时默认不会导致作业重启,可以提供下JM log看看作业为什么会重启吗?

> On 3 Mar 2022, at 9:15 PM, kong <62...@163.com> wrote:
> 
> hello,我最近遇到一个问题:
> 我通过flink消费kafka数据,job 图大概是这样的:Source -> map -> filter -> flatMap -> Map -> 
> Sink
> 在一瞬间kafka的producer端会产生大量的数据,导致flink无法消费完,我的checkpoint设置的是10分钟;
> 最后会产生Checkpoint expired before 
> completing.的错误,导致job重启,从而导致从上一个checkpoint恢复,然后重复消费数据,又导致checkpoint超时,死循环了。
> 
> 
> 不知道有什么好办法解决该问题。
> 多谢~
> 



flink 反压导致checkpoint超时,从而导致任务失败问题

2022-03-03 文章 kong
hello,我最近遇到一个问题:
我通过flink消费kafka数据,job 图大概是这样的:Source -> map -> filter -> flatMap -> Map -> Sink
在一瞬间kafka的producer端会产生大量的数据,导致flink无法消费完,我的checkpoint设置的是10分钟;
最后会产生Checkpoint expired before 
completing.的错误,导致job重启,从而导致从上一个checkpoint恢复,然后重复消费数据,又导致checkpoint超时,死循环了。


不知道有什么好办法解决该问题。
多谢~



Re: flink 不触发checkpoint

2022-02-20 文章 Tony Wei
Hi,

有考慮升級 1.14 嗎?Flink 1.14 支持了 FLIP-147,讓 Flink 在 task 為 finished 狀態時仍能觸發
checkpoint [1, 2]。

[1]
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

best regards,

RS  於 2022年2月18日 週五 下午5:27寫道:

> 1. 图片挂了,看不到,尽量用文字,或者用图床等工具
> 2. 启动任务有配置checkpoint吗?
>
>
>
>
>
>
>
>
>
>
> 在 2022-02-17 11:40:04,"董少杰"  写道:
>
>
> flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint?
> flink版本1.12.2。
> 谢谢!
>
>
>
>
>
>
> | |
> 董少杰
> |
> |
> eric21...@163.com
> |


Re:flink 不触发checkpoint

2022-02-18 文章 RS
1. 图片挂了,看不到,尽量用文字,或者用图床等工具
2. 启动任务有配置checkpoint吗?










在 2022-02-17 11:40:04,"董少杰"  写道:

flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint?
flink版本1.12.2。
谢谢!






| |
董少杰
|
|
eric21...@163.com
|

flink 不触发checkpoint

2022-02-16 文章 董少杰
flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint?
flink版本1.12.2。
谢谢!






| |
董少杰
|
|
eric21...@163.com
|

Re: 关于streamFileSink在checkpoint下生成文件问题

2022-01-11 文章 Chang Li
直接用的开源版本吗?还是公司内部有改动,原生的cp是固定频率,而很多公司离线计算都是整点触发的,为了减少延迟,会自定义在整点触发一次cp,开源目前没有这个feature

黄志高  于2021年12月1日周三 21:53写道:

> hi,各位大佬,咨询个问题
>
>  
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>
>
>
>


Re: Re: 关于streamFileSink在checkpoint下生成文件问题

2022-01-11 文章 Chang Li
直接用的开源版本吗?还是公司内部有改动,原生的cp是固定频率,而很多公司离线计算都是整点触发的,为了减少延迟,会自定义在整点触发一次cp,开源目前没有这个feature

黄志高  于2021年12月2日周四 14:14写道:

> |
>
>
>
>
> 32684
> |
> COMPLETED
> | 8/8 | 13:52:36 | 13:52:38 | 2s | 126 KB | 0 B |
> | | 32683 |
> COMPLETED
> | 8/8 | 13:42:36 | 13:42:39 | 2s | 126 KB | 0 B |
> | | 32682 |
> COMPLETED
> | 8/8 | 13:32:36 | 13:32:39 | 2s | 126 KB | 0 B |
> | | 32681 |
> COMPLETED
> | 8/8 | 13:22:36 | 13:22:39 | 2s | 125 KB | 0 B |
> | | 32680 |
> COMPLETED
> | 8/8 | 13:12:36 | 13:12:39 | 2s | 125 KB | 0 B |
> | | 32679 |
> COMPLETED
> | 8/8 | 13:02:36 | 13:02:41 | 4s | 214 KB | 0 B |
> 上图是checkpoint
>
>
> 这个是在11月30号0时段生成的文件
> 2021-11-30 00:00:011080827 athena_other-0-217891.gz
> 2021-11-30 00:02:424309209 athena_other-0-217892.gz
> 2021-11-30 00:12:403902474 athena_other-0-217893.gz
> 2021-11-30 00:22:403886322 athena_other-0-217894.gz
> 2021-11-30 00:32:403988037 athena_other-0-217895.gz
> 2021-11-30 00:42:403892343 athena_other-0-217896.gz
> 2021-11-30 00:52:392972183 athena_other-0-217897.gz
> 2021-11-30 00:00:011125774 athena_other-1-219679.gz
> 2021-11-30 00:02:424338748 athena_other-1-219680.gz
> 2021-11-30 00:12:404204571 athena_other-1-219681.gz
> 2021-11-30 00:22:403852791 athena_other-1-219682.gz
> 2021-11-30 00:32:404025214 athena_other-1-219683.gz
> 2021-11-30 00:42:404205107 athena_other-1-219684.gz
> 2021-11-30 00:52:392922192 athena_other-1-219685.gz
> 2021-11-30 00:00:011103734 athena_other-2-220084.gz
>
>
> 这个是1点生成的文件
> 2021-11-30 01:00:011228793 athena_other-0-217951.gz
> 2021-11-30 01:02:424243566 athena_other-0-217952.gz
> 2021-11-30 01:12:404106305 athena_other-0-217953.gz
> 2021-11-30 01:22:404456214 athena_other-0-217954.gz
> 2021-11-30 01:32:414303156 athena_other-0-217955.gz
> 2021-11-30 01:42:404688872 athena_other-0-217956.gz
> 2021-11-30 01:52:403251910 athena_other-0-217957.gz
> 2021-11-30 01:00:011163354 athena_other-1-219736.gz
> 2021-11-30 01:02:424405233 athena_other-1-219737.gz
> 2021-11-30 01:12:404094502 athena_other-1-219738.gz
> 2021-11-30 01:22:404395071 athena_other-1-219739.gz
> 2021-11-30 01:32:404205169 athena_other-1-219740.gz
> 2021-11-30 01:42:404432610 athena_other-1-219741.gz
> 2021-11-30 01:52:403224111 athena_other-1-219742.gz
> 2021-11-30 01:00:011163964 athena_other-2-220137.gz
>
>
>
>
> 之前的截图无法发送,我把文件贴出来,打扰了
>
>
>
>
>
>
>
> 在 2021-12-02 13:52:43,"黄志高"  写道:
>
>
>
>
>
> Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-12-02 11:37:31,"Caizhi Weng"  写道:
> >Hi!
> >
> >邮件里看不到图片和附件,建议使用外部图床。
> >
> >partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
> >exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。
> >
> >黄志高  于2021年12月1日周三 下午9:53写道:
> >
> >> hi,各位大佬,咨询个问题
> >>
> >>
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
> >>
> >>
> >>
> >>
>
>
>
>
>
>


Re: flink 无法checkpoint问题

2021-12-29 文章 Caizhi Weng
Hi!

图片无法显示,建议使用外部图床上传。

checkpoint 慢的原因可能有很多,最可能的原因是由于算子处理数据太慢导致反压(可以通过 Flink web UI 每个节点的 busy
百分比大致看出来)。建议检查资源是否充足,数据是否倾斜,gc 是否过于频繁等。

紫月幽魔灵  于2021年12月28日周二 10:38写道:

> 版本:flink版本1.14.0
> 问题: 使用flink 1.14.0版本提交到jdk1.7版本的yarn集群上checkpoint无法生成,一直处于IN_PROGRESS状态
> 提交命令如下:
> ./bin/flinksql-submit.sh \
> --sql sqlserver-cdc-to-kafka.sql \
> -m yarn-cluster \
> -ynm sqlserverTOkafka \
> -ys 2 \
> -yjm 1024 \
> -ytm 1024 \
> -yid application_1640657115196_0001 \
> -yD yarn.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD containerized.master.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD containerized.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD log4j2.formatMsgNoLookups=true
> 这是什么原因造成的呢?
>


flink ????checkpoint????

2021-12-27 文章 ??????????
:flink1.14.0
: flink 
1.14.0??jdk1.7??yarn??checkpoint,IN_PROGRESS

:
./bin/flinksql-submit.sh \
--sql sqlserver-cdc-to-kafka.sql \
-m yarn-cluster \
-ynm sqlserverTOkafka \
-ys 2 \
-yjm 1024 \
-ytm 1024 \
-yid application_1640657115196_0001 \
-yD yarn.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
-yD containerized.master.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
-yD containerized.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
-yD log4j2.formatMsgNoLookups=true

?

Re:Re: 关于streamFileSink在checkpoint下生成文件问题

2021-12-01 文章 黄志高
|




32684
|
COMPLETED
| 8/8 | 13:52:36 | 13:52:38 | 2s | 126 KB | 0 B |
| | 32683 |
COMPLETED
| 8/8 | 13:42:36 | 13:42:39 | 2s | 126 KB | 0 B |
| | 32682 |
COMPLETED
| 8/8 | 13:32:36 | 13:32:39 | 2s | 126 KB | 0 B |
| | 32681 |
COMPLETED
| 8/8 | 13:22:36 | 13:22:39 | 2s | 125 KB | 0 B |
| | 32680 |
COMPLETED
| 8/8 | 13:12:36 | 13:12:39 | 2s | 125 KB | 0 B |
| | 32679 |
COMPLETED
| 8/8 | 13:02:36 | 13:02:41 | 4s | 214 KB | 0 B |
上图是checkpoint


这个是在11月30号0时段生成的文件
2021-11-30 00:00:011080827 athena_other-0-217891.gz
2021-11-30 00:02:424309209 athena_other-0-217892.gz
2021-11-30 00:12:403902474 athena_other-0-217893.gz
2021-11-30 00:22:403886322 athena_other-0-217894.gz
2021-11-30 00:32:403988037 athena_other-0-217895.gz
2021-11-30 00:42:403892343 athena_other-0-217896.gz
2021-11-30 00:52:392972183 athena_other-0-217897.gz
2021-11-30 00:00:011125774 athena_other-1-219679.gz
2021-11-30 00:02:424338748 athena_other-1-219680.gz
2021-11-30 00:12:404204571 athena_other-1-219681.gz
2021-11-30 00:22:403852791 athena_other-1-219682.gz
2021-11-30 00:32:404025214 athena_other-1-219683.gz
2021-11-30 00:42:404205107 athena_other-1-219684.gz
2021-11-30 00:52:392922192 athena_other-1-219685.gz
2021-11-30 00:00:011103734 athena_other-2-220084.gz


这个是1点生成的文件
2021-11-30 01:00:011228793 athena_other-0-217951.gz
2021-11-30 01:02:424243566 athena_other-0-217952.gz
2021-11-30 01:12:404106305 athena_other-0-217953.gz
2021-11-30 01:22:404456214 athena_other-0-217954.gz
2021-11-30 01:32:414303156 athena_other-0-217955.gz
2021-11-30 01:42:404688872 athena_other-0-217956.gz
2021-11-30 01:52:403251910 athena_other-0-217957.gz
2021-11-30 01:00:011163354 athena_other-1-219736.gz
2021-11-30 01:02:424405233 athena_other-1-219737.gz
2021-11-30 01:12:404094502 athena_other-1-219738.gz
2021-11-30 01:22:404395071 athena_other-1-219739.gz
2021-11-30 01:32:404205169 athena_other-1-219740.gz
2021-11-30 01:42:404432610 athena_other-1-219741.gz
2021-11-30 01:52:403224111 athena_other-1-219742.gz
2021-11-30 01:00:011163964 athena_other-2-220137.gz




之前的截图无法发送,我把文件贴出来,打扰了







在 2021-12-02 13:52:43,"黄志高"  写道:




Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy














在 2021-12-02 11:37:31,"Caizhi Weng"  写道:
>Hi!
>
>邮件里看不到图片和附件,建议使用外部图床。
>
>partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
>exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。
>
>黄志高  于2021年12月1日周三 下午9:53写道:
>
>> hi,各位大佬,咨询个问题
>>
>>  
>> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>>
>>
>>
>>





 

Re: 关于streamFileSink在checkpoint下生成文件问题

2021-12-01 文章 Caizhi Weng
Hi!

邮件里看不到图片和附件,建议使用外部图床。

partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。

黄志高  于2021年12月1日周三 下午9:53写道:

> hi,各位大佬,咨询个问题
>
>  
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>
>
>
>


关于streamFileSink在checkpoint下生成文件问题

2021-12-01 文章 黄志高
hi,各位大佬,咨询个问题
   
我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看

Flink checkpoint文件大小与对应内存大小映射关系

2021-12-01 文章 mayifan
Hi,All~!

麻烦大家一个问题,有大佬了解过checkpoint文件大小与实际内存对应的状态数据大小的映射关系吗?

比如Fs状态后端checkpoint后文件大小是1MB,对应的状态数据在内存中占用大概是多少呢?

感谢答复~!

Re: flink的job运行一段时间后, checkpoint就一直失败

2021-11-18 文章 Caizhi Weng
Hi!

checkpoint 超时有很多可能性。最常见的原因是超时的节点太忙阻塞了 checkpoint(包括计算资源不足,或者数据有倾斜等),这可以通过看
Flink web UI 上的 busy 以及反压信息判断;另外一个常见原因是 gc 太频繁,可以通过设置 jvm 参数打印出 gc log 观察。

yu...@kiscloud.net  于2021年11月18日周四 下午2:54写道:

> flink的job运行一段时间后, checkpoint就一直失败,信息如下:
> ID
> Status
> Acknowledged
> Trigger Time
> Latest Acknowledgement
> End to End Duration
> State Size
> Buffered During Alignment
> 295
> FAILED
> 30/5011:55:3811:55:391h 0m 0s205 KB0 B
> Checkpoint Detail:
> Path: - Discarded: - Failure Message: Checkpoint expired before completing.
> Operators:
> Name
> Acknowledged
> Latest Acknowledgment
> End to End Duration
> State Size
> Buffered During Alignment
> Source: dw-member
> 6/10 (60%)11:55:391s7.08 KB0 B
> Source: wi-order
> 6/10 (60%)11:55:391s7.11 KB0 B
> Source: dw-pay
> 6/10 (60%)11:55:391s7.11 KB0 B
> RecordTransformOperator
> 6/10 (60%)11:55:391s98.8 KB0 B
> RecordComputeOperator -> Sink: dw-record-data-sink
> 6/10 (60%)11:55:391s85.1 KB0 B
> SubTasks:
> End to End Duration
> State Size
> Checkpoint Duration (Sync)
> Checkpoint Duration (Async)
> Alignment Buffered
> Alignment Duration
> Minimum1s14.2 KB7ms841ms0 B13ms
> Average1s14.2 KB94ms1s0 B13ms
> Maximum1s14.2 KB181ms1s0 B15ms
> ID
> Acknowledgement Time
> E2E Duration
> State Size
> Checkpoint Duration (Sync)
> Checkpoint Duration (Async)
> Align Buffered
> Align Duration
> 1n/a
> 211:55:391s14.2 KB8ms1s0 B15ms
> 3n/a
> 411:55:391s14.2 KB181ms1s0 B13ms
> 5n/a
> 611:55:391s14.2 KB8ms1s0 B14ms
> 711:55:391s14.2 KB181ms961ms0 B13ms
> 8n/a
> 911:55:391s14.2 KB181ms841ms0 B13ms
> 1011:55:391s14.2 KB7ms1s0 B14ms
>
>
> 请问,这类问题如何排查,有没有好的建议或者最佳实践?谢谢!
>


flink的job运行一段时间后, checkpoint就一直失败

2021-11-17 文章 yu...@kiscloud.net
flink的job运行一段时间后, checkpoint就一直失败,信息如下:
ID
Status
Acknowledged
Trigger Time
Latest Acknowledgement
End to End Duration
State Size
Buffered During Alignment
295
FAILED
30/5011:55:3811:55:391h 0m 0s205 KB0 B
Checkpoint Detail:
Path: - Discarded: - Failure Message: Checkpoint expired before completing.
Operators:
Name
Acknowledged
Latest Acknowledgment
End to End Duration
State Size
Buffered During Alignment
Source: dw-member
6/10 (60%)11:55:391s7.08 KB0 B
Source: wi-order
6/10 (60%)11:55:391s7.11 KB0 B
Source: dw-pay
6/10 (60%)11:55:391s7.11 KB0 B
RecordTransformOperator
6/10 (60%)11:55:391s98.8 KB0 B
RecordComputeOperator -> Sink: dw-record-data-sink
6/10 (60%)11:55:391s85.1 KB0 B
SubTasks:
End to End Duration
State Size
Checkpoint Duration (Sync)
Checkpoint Duration (Async)
Alignment Buffered
Alignment Duration
Minimum1s14.2 KB7ms841ms0 B13ms
Average1s14.2 KB94ms1s0 B13ms
Maximum1s14.2 KB181ms1s0 B15ms
ID
Acknowledgement Time
E2E Duration
State Size
Checkpoint Duration (Sync)
Checkpoint Duration (Async)
Align Buffered
Align Duration
1n/a
211:55:391s14.2 KB8ms1s0 B15ms
3n/a
411:55:391s14.2 KB181ms1s0 B13ms
5n/a
611:55:391s14.2 KB8ms1s0 B14ms
711:55:391s14.2 KB181ms961ms0 B13ms
8n/a
911:55:391s14.2 KB181ms841ms0 B13ms
1011:55:391s14.2 KB7ms1s0 B14ms


请问,这类问题如何排查,有没有好的建议或者最佳实践?谢谢!


checkpoint??????????

2021-11-08 文章 ??????
:
flink on yarn ??flink 
hdfs,ark1??hdfs??active??standby
ark2standbyactive
:??flink??checkpoint??hdfs??url??hdfs:ark:8082 
,standby??,




hdfs??mycluster checkpoint
final String HADOOP_CONF_DIR = "/etc/hadoop/conf";
org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
configuration.addResource(new Path(HADOOP_CONF_DIR + "/core-site.xml"));
configuration.addResource(new Path(HADOOP_CONF_DIR + "/hdfs-site.xml"));

env.setStateBackend(new FsStateBackend("hdfs://mycluster/flinkCheckpoint"));   
//??

??
Caused by: java.io.IOException: Cannot instantiate file system for URI: 
hdfs://mycluster/flinkCheckpoint
at 
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:196)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:527)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:408)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.

checkpoint??????????

2021-11-07 文章 ??????
:
flink on yarn ??flink 
hdfs,ark1??hdfs??active??standby
ark2standbyactive
:??flink??checkpoint??hdfs??url??hdfs:ark:8082 
,standby??,


??




Re: Re: 回复:回复:Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-11-04 文章 zhisheng
考虑 currentOffsets 吧

杨浩  于2021年10月27日周三 下午5:40写道:

> 明白这个逻辑,这个就导致consumer
> lag值不能反映真实情况,而很难监控系统延迟一个场景:业务状态很大,5分钟保存一次,QPS在1~100之间波动,那么需要配置延迟大于5*60*100来监控系统,这会导致监控非常不准确
> 在 2021-10-27 17:34:13,"Qingsheng Ren"  写道:
> >你好!
> >
> >如果使用的是基于 FLIP-27 实现的 KafkaSource,可以配置 enable.auto.commit = true 和
> auto.commit.interval.ms = {commit_interval} 使 KafkaSource 按照指定的时间间隔自动提交
> offset。基于 SourceFunction 的 FlinkKafkaConsumer 在 checkpoint 开启时不支持自动提交,只能在
> checkpoint 时提交位点。
> >
> >--
> >Best Regards,
> >
> >Qingsheng Ren
> >Email: renqs...@gmail.com
> >On Oct 27, 2021, 4:59 PM +0800, 杨浩 , wrote:
> >> 请问有办法和现有监控兼容么?开启checkpoint时,让消费组的offset实时更新
> >> 在 2021-10-25 21:58:28,"杨浩"  写道:
> >> > currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets
> >> > 在 2021-10-25 10:31:12,"Caizhi Weng"  写道:
> >> > > Hi!
> >> > >
> >> > > 这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取
> offset,可以通过
> >> > > metrics 读取,见 [1]。
> >> > >
> >> > > [1]
> >> > >
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors
> >> > >
> >> > > 杨浩  于2021年10月25日周一 上午10:20写道:
> >> > >
> >> > > >
> 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度
>


Re: 关于作业失败从checkpoint重启,触发了过期的窗口计算

2021-10-31 文章 liwei li
我是指添加ContinuousProcessingTimeTrigger或者ContinuousEventTimeTrigger。
这个不见得能解决你当前的问题,只是每隔一段时间触发一下窗口计算,得到一个中间结果缓存起来,无需等窗口结束再计算总量。

claylin <1012539...@qq.com.invalid> 于2021年10月31日周日 下午4:33写道:

> 添加continue trigger怎么理解
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> hilili...@gmail.com;
> 发送时间:2021年10月30日(星期六) 晚上10:58
> 收件人:"user-zh"
> 主题:Re: 关于作业失败从checkpoint重启,触发了过期的窗口计算
>
>
>
> 可以试试添加使用Continuou Trigger
>
> Yun Tang 
>  Hi,
> 
>  先问个版本问题,你的Flink版本是1.3 而不是1.13?
> 
> 
> 
> Checkpoint里面会存储timer,所以重启之后会触发窗口的计算,但确实这种一天的窗口累计有点太多了,除非你的作业存在比较严重的反压,导致checkpoint内积攒了大量没有触发的timer。
> 
>  祝好
>  唐云
> 
>  
>  From: claylin <1012539...@qq.com.INVALID
>  Sent: Friday, October 29, 2021 11:33
>  To: user-zh   Subject: 关于作业失败从checkpoint重启,触发了过期的窗口计算
> 
> 
> 
> 作业失败后从checkpoint重启,重启后会触发之前已经过期的时间窗口计算,求问这个该怎么解决。我的运行环境是flink1.3/1.4+sql举例:作业每小时做一次checkpoint,状态设置了1小时过期,状态后端使用rocksdb,同时使用了一天的滚动时间窗口,然后今天15点30分重启,但是重启后会有昨天的窗口计算结果触发。
>  按理说不应该会触发已经过期的窗口计算,而且flink 1.3/1.4
>  下nbsp;nbsp;state.backend.rocksdb.timer-service.factory
> 这个配置默认是rocksdb,
>  也就是说rocksdb里面存储了状态的有效时间,
>  不管怎么样也不应该触发已经过期的窗口计算,
>  请问大家有没有遇到过这种问题,怎么解决。
> 


?????? ??????????????checkpoint??????????????????????????

2021-10-31 文章 claylin
continue trigger




----
??: 
   "user-zh"



Re: 关于作业失败从checkpoint重启,触发了过期的窗口计算

2021-10-30 文章 liwei li
可以试试添加使用Continuou Trigger

Yun Tang  于2021年10月29日周五 下午5:56写道:

> Hi,
>
> 先问个版本问题,你的Flink版本是1.3 而不是1.13?
>
>
> Checkpoint里面会存储timer,所以重启之后会触发窗口的计算,但确实这种一天的窗口累计有点太多了,除非你的作业存在比较严重的反压,导致checkpoint内积攒了大量没有触发的timer。
>
> 祝好
> 唐云
>
> 
> From: claylin <1012539...@qq.com.INVALID>
> Sent: Friday, October 29, 2021 11:33
> To: user-zh 
> Subject: 关于作业失败从checkpoint重启,触发了过期的窗口计算
>
>
> 作业失败后从checkpoint重启,重启后会触发之前已经过期的时间窗口计算,求问这个该怎么解决。我的运行环境是flink1.3/1.4+sql举例:作业每小时做一次checkpoint,状态设置了1小时过期,状态后端使用rocksdb,同时使用了一天的滚动时间窗口,然后今天15点30分重启,但是重启后会有昨天的窗口计算结果触发。
> 按理说不应该会触发已经过期的窗口计算,而且flink 1.3/1.4
> 下state.backend.rocksdb.timer-service.factory 这个配置默认是rocksdb,
> 也就是说rocksdb里面存储了状态的有效时间,
> 不管怎么样也不应该触发已经过期的窗口计算,
> 请问大家有没有遇到过这种问题,怎么解决。
>


Re: 增量checkpoint是否可以用来恢复flink作业

2021-10-30 文章 liwei li
增量checkpoint是可以恢复作业的。

Flink 的增量 checkpoint 以 RocksDB 的 checkpoint 为基础。RocksDB 是一个 LSM 结构的 KV
> 数据库,把所有的修改保存在内存的可变缓存中(称为 memtable),所有对 memtable 中 key 的修改,会覆盖之前的 value,当前
> memtable 满了之后,RocksDB 会将所有数据以有序的写到磁盘。当 RocksDB 将 memtable
> 写到磁盘后,整个文件就不再可变,称为有序字符串表(sstable)。
> RocksDB 的后台压缩线程会将 sstable 进行合并,就重复的键进行合并,合并后的 sstable 包含所有的键值对,RocksDB
> 会删除合并前的 sstable。
> 在这个基础上,Flink 会记录上次 checkpoint 之后所有新生成和删除的 sstable,另外因为 sstable 是不可变的,Flink
> 用 sstable 来记录状态的变化。为此,Flink 调用 RocksDB 的 flush,强制将 memtable 的数据全部写到
> sstable,并硬链到一个临时目录中。这个步骤是在同步阶段完成,其他剩下的部分都在异步阶段完成,不会阻塞正常的数据处理。
> Flink 将所有新生成的 sstable 备份到持久化存储(比如 HDFS,S3),并在新的 checkpoint 中引用。Flink
> 并不备份前一个 checkpoint 中已经存在的 sstable,而是引用他们。Flink 还能够保证所有的 checkpoint
> 都不会引用已经删除的文件,因为 RocksDB 中文件删除是由压缩完成的,压缩后会将原来的内容合并写成一个新的 sstable。因此,Flink 增量
> checkpoint 能够切断 checkpoint 历史。
> 为了追踪 checkpoint 间的差距,备份合并后的 sstable 是一个相对冗余的操作。但是 Flink
> 会增量的处理,增加的开销通常很小,并且可以保持一个更短的 checkpoint 历史,恢复时从更少的 checkpoint
> 进行读取文件,因此我们认为这是值得的。


以上内容引用自:Apache Flink 管理大型状态之增量 Checkpoint 详解
<https://flink-learning.org.cn/article/detail/6636e468bc961f789f8d0ba8ffd51a95>

里面详细介绍了增量checkpoint的原理、容灾恢复以及性能,有兴趣可以参考一下。

casel.chen  于2021年10月28日周四 上午10:48写道:

> 增量checkpoint是否可以用来恢复flink作业?
> 增量checkpoint我理解是有一个base checkpoint + 若干个delta checkpoint
> (中间会做一次全量checkpoint以截断过长的血缘吗?),恢复的时候需要从base checkpoint开始一个个按时间顺序应用delta
> checkpoint。
> 按这样的话,每个delta
> checkpoint都需要保留才可以恢复状态,但现实并不是所有checkpoint都保留,所以我觉得增量checkpoint是不能用来恢复flink作业的,这样理解对吗?


Re: flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-29 文章 Yun Tang
Hi

可以使用jstack,async profiler [1] 
等工具勘察一下checkpoint期间的CPU栈。oss需要先写本地再上传,确实可能CPU消耗多一些,但是明显高很多有一些超出预期。


[1] https://github.com/jvm-profiling-tools/async-profiler

祝好
唐云

From: Lei Wang 
Sent: Tuesday, October 19, 2021 14:01
To: user-zh@flink.apache.org 
Subject: Re: flink 以阿里云 oss 作为 checkpoint cpu 过高

确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。
程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G


On Mon, Oct 18, 2021 at 10:44 AM Michael Ran  wrote:

> 应该和OSS没关系吧,毕竟只是个存储。
> 我们CPU 你先看看消耗在哪个线程或者方法类呗
>
>
>
> 在 2021-10-08 16:34:47,"Lei Wang"  写道:
>
>
>
> flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
> 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。
>
>
>
>
>
>
> 这个可能的原因是什么?会跟 OSS 有关吗?
>
>
> 谢谢,
> 王磊


Re: 关于作业失败从checkpoint重启,触发了过期的窗口计算

2021-10-29 文章 Yun Tang
Hi,

先问个版本问题,你的Flink版本是1.3 而不是1.13?

Checkpoint里面会存储timer,所以重启之后会触发窗口的计算,但确实这种一天的窗口累计有点太多了,除非你的作业存在比较严重的反压,导致checkpoint内积攒了大量没有触发的timer。

祝好
唐云


From: claylin <1012539...@qq.com.INVALID>
Sent: Friday, October 29, 2021 11:33
To: user-zh 
Subject: 关于作业失败从checkpoint重启,触发了过期的窗口计算

作业失败后从checkpoint重启,重启后会触发之前已经过期的时间窗口计算,求问这个该怎么解决。我的运行环境是flink1.3/1.4+sql举例:作业每小时做一次checkpoint,状态设置了1小时过期,状态后端使用rocksdb,同时使用了一天的滚动时间窗口,然后今天15点30分重启,但是重启后会有昨天的窗口计算结果触发。
按理说不应该会触发已经过期的窗口计算,而且flink 1.3/1.4 
下state.backend.rocksdb.timer-service.factory 这个配置默认是rocksdb, 
也就是说rocksdb里面存储了状态的有效时间,
不管怎么样也不应该触发已经过期的窗口计算,
请问大家有没有遇到过这种问题,怎么解决。


??????????????checkpoint??????????????????????????

2021-10-28 文章 claylin
checkpointflink1.3/1.4+sql??checkpoint1??rocksdb15??30??
??flink 1.3/1.4 
??state.backend.rocksdb.timer-service.factory 
??rocksdb?? rocksdb??
??


增量checkpoint是否可以用来恢复flink作业

2021-10-27 文章 casel.chen
增量checkpoint是否可以用来恢复flink作业? 
增量checkpoint我理解是有一个base checkpoint + 若干个delta checkpoint 
(中间会做一次全量checkpoint以截断过长的血缘吗?),恢复的时候需要从base checkpoint开始一个个按时间顺序应用delta 
checkpoint。
按这样的话,每个delta 
checkpoint都需要保留才可以恢复状态,但现实并不是所有checkpoint都保留,所以我觉得增量checkpoint是不能用来恢复flink作业的,这样理解对吗?

回复:Re: 回复:回复:Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-10-27 文章 杨浩
明白这个逻辑,这个就导致consumer 
lag值不能反映真实情况,而很难监控系统延迟一个场景:业务状态很大,5分钟保存一次,QPS在1~100之间波动,那么需要配置延迟大于5*60*100来监控系统,这会导致监控非常不准确
在 2021-10-27 17:34:13,"Qingsheng Ren"  写道:
>你好!
>
>如果使用的是基于 FLIP-27 实现的 KafkaSource,可以配置 enable.auto.commit = true 和 
>auto.commit.interval.ms = {commit_interval} 使 KafkaSource 按照指定的时间间隔自动提交 
>offset。基于 SourceFunction 的 FlinkKafkaConsumer 在 checkpoint 开启时不支持自动提交,只能在 
>checkpoint 时提交位点。
>
>--
>Best Regards,
>
>Qingsheng Ren
>Email: renqs...@gmail.com
>On Oct 27, 2021, 4:59 PM +0800, 杨浩 , wrote:
>> 请问有办法和现有监控兼容么?开启checkpoint时,让消费组的offset实时更新
>> 在 2021-10-25 21:58:28,"杨浩"  写道:
>> > currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets
>> > 在 2021-10-25 10:31:12,"Caizhi Weng"  写道:
>> > > Hi!
>> > >
>> > > 这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过
>> > > metrics 读取,见 [1]。
>> > >
>> > > [1]
>> > > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors
>> > >
>> > > 杨浩  于2021年10月25日周一 上午10:20写道:
>> > >
>> > > > 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度


Re: 回复:回复:Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-10-27 文章 Qingsheng Ren
你好!

如果使用的是基于 FLIP-27 实现的 KafkaSource,可以配置 enable.auto.commit = true 和 
auto.commit.interval.ms = {commit_interval} 使 KafkaSource 按照指定的时间间隔自动提交 
offset。基于 SourceFunction 的 FlinkKafkaConsumer 在 checkpoint 开启时不支持自动提交,只能在 
checkpoint 时提交位点。

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Oct 27, 2021, 4:59 PM +0800, 杨浩 , wrote:
> 请问有办法和现有监控兼容么?开启checkpoint时,让消费组的offset实时更新
> 在 2021-10-25 21:58:28,"杨浩"  写道:
> > currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets
> > 在 2021-10-25 10:31:12,"Caizhi Weng"  写道:
> > > Hi!
> > >
> > > 这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过
> > > metrics 读取,见 [1]。
> > >
> > > [1]
> > > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors
> > >
> > > 杨浩  于2021年10月25日周一 上午10:20写道:
> > >
> > > > 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度


回复:回复:Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-10-27 文章 杨浩
请问有办法和现有监控兼容么?开启checkpoint时,让消费组的offset实时更新
在 2021-10-25 21:58:28,"杨浩"  写道:
>currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets
>在 2021-10-25 10:31:12,"Caizhi Weng"  写道:
>>Hi!
>>
>>这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过
>>metrics 读取,见 [1]。
>>
>>[1]
>>https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors
>>
>>杨浩  于2021年10月25日周一 上午10:20写道:
>>
>>> 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度


回复:Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-10-25 文章 杨浩
currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets
在 2021-10-25 10:31:12,"Caizhi Weng"  写道:
>Hi!
>
>这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过
>metrics 读取,见 [1]。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors
>
>杨浩  于2021年10月25日周一 上午10:20写道:
>
>> 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度


Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-10-24 文章 Caizhi Weng
Hi!

这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过
metrics 读取,见 [1]。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors

杨浩  于2021年10月25日周一 上午10:20写道:

> 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度


在开启checkpoint后如何设置offset的自动提交以方便监控

2021-10-24 文章 杨浩
请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度

Re: flink修改sink并行度后,无法从checkpoint restore问题

2021-10-21 文章 yue ma
hello 这个报错看上去并不是状态不兼容的报错。 我看代码 Sink 算子设置了uid 理论上是可以正确恢复的。

kong <62...@163.com> 于2021年10月21日周四 上午10:26写道:

> hi,我遇到flink修改sink并行度后,无法从checkpoint restore问题
>
>
> flink 版本: 1.13.1
> flink on yarn
> DataStream api方式写的java job
>
>
> 试验1:不修改任何代码,cancel job后,能从指定的checkpoint恢复
>dataStream.addSink(new Sink(config)).name("").uid("");
> 试验2:只修改sink端的并行度,job无法启动,一直是Initiating状态
> dataStream.addSink(new
> Sink(config)).name("").uid("").setParallelism(2);
>
>
> 日志异常
> 2021-10-21 09:52:57,076 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting
> RPC endpoint for
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at
> akka://flink/user/rpc/resourcemanager_0 .
> 2021-10-21 09:52:57,132 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}.
> 2021-10-21 09:52:57,133 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Starting the resource manager.
> 2021-10-21 09:52:57,134 INFO
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
> Starting DefaultLeaderRetrievalService with
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
> 2021-10-21 09:52:57,135 INFO
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
> Starting DefaultLeaderRetrievalService with
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
> 2021-10-21 09:52:57,142 INFO
> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] -
> Start JobDispatcherLeaderProcess.
> 2021-10-21 09:52:57,151 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting
> RPC endpoint for org.apache.flink.runtime.dispatcher.MiniDispatcher at
> akka://flink/user/rpc/dispatcher_1 .
> 2021-10-21 09:52:57,228 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/57645e97919d2efebfab67e2846696e7/job_manager_lock'}.
> 2021-10-21 09:52:57,312 WARN  akka.remote.ReliableDeliverySupervisor
>  [] - Association with remote system
> [akka.tcp://flink@node2.hadoop:42535] has failed, address is now gated
> for [50] ms. Reason: [Association failed with 
> [akka.tcp://flink@node2.hadoop:42535]]
> Caused by: [java.net.ConnectException: Connection refused:
> node2.hadoop/xx.xx.xx.xx:42535]
> 2021-10-21 09:52:57,313 WARN  akka.remote.transport.netty.NettyTransport
>  [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused:
> node2.hadoop/xx.xx.xx.xx:42535
> 2021-10-21 09:52:57,352 INFO
> org.apache.flink.yarn.YarnResourceManagerDriver  [] - Recovered
> 0 containers from previous attempts ([]).
> 2021-10-21 09:52:57,353 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 0 workers from previous attempt.
> 2021-10-21 09:52:57,379 INFO  org.apache.hadoop.conf.Configuration
>  [] - resource-types.xml not found
> 2021-10-21 09:52:57,379 INFO
> org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to
> find 'resource-types.xml'.
> 2021-10-21 09:52:57,394 INFO
> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
> Enabled external resources: []
> 2021-10-21 09:52:57,395 WARN  akka.remote.transport.netty.NettyTransport
>  [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused:
> node2.hadoop/xx.xx.xx.xx:42535
> 2021-10-21 09:52:57,396 WARN  akka.remote.ReliableDeliverySupervisor
>  [] - Association with remote system
> [akka.tcp://flink@node2.hadoop:42535] has failed, address is now gated
> for [50] ms. Reason: [Association failed with 
> [akka.tcp://flink@node2.hadoop:42535]]
> Caused by: [java.net.ConnectException: Connection refused:
> node2.hadoop/xx.xx.xx.xx:42535]
> 2021-10-21 09:52:57,400 INFO
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper
> bound of the thread pool size is 500
> 2021-10-21 09:52:57,403 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}.
> 2021-10-21 09:52:57,407 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> ResourceManager akka.tcp://flink@node3.hadoop:43978/user/rpc/resourcemanager_0
> was granted leadership with fencing token ae27185bb3fda634d2c109510ab54ba4
>
>
>
>
>
>


flink修改sink并行度后,无法从checkpoint restore问题

2021-10-20 文章 kong
hi,我遇到flink修改sink并行度后,无法从checkpoint restore问题


flink 版本: 1.13.1
flink on yarn
DataStream api方式写的java job


试验1:不修改任何代码,cancel job后,能从指定的checkpoint恢复
   dataStream.addSink(new Sink(config)).name("").uid("");
试验2:只修改sink端的并行度,job无法启动,一直是Initiating状态
dataStream.addSink(new 
Sink(config)).name("").uid("").setParallelism(2);


日志异常
2021-10-21 09:52:57,076 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at 
akka://flink/user/rpc/resourcemanager_0 .
2021-10-21 09:52:57,132 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}.
2021-10-21 09:52:57,133 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Starting the resource manager.
2021-10-21 09:52:57,134 INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Starting DefaultLeaderRetrievalService with 
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
2021-10-21 09:52:57,135 INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Starting DefaultLeaderRetrievalService with 
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
2021-10-21 09:52:57,142 INFO  
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - 
Start JobDispatcherLeaderProcess.
2021-10-21 09:52:57,151 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.MiniDispatcher at 
akka://flink/user/rpc/dispatcher_1 .
2021-10-21 09:52:57,228 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
ZooKeeperLeaderElectionDriver{leaderPath='/leader/57645e97919d2efebfab67e2846696e7/job_manager_lock'}.
2021-10-21 09:52:57,312 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink@node2.hadoop:42535] has failed, address is now gated for [50] 
ms. Reason: [Association failed with [akka.tcp://flink@node2.hadoop:42535]] 
Caused by: [java.net.ConnectException: Connection refused: 
node2.hadoop/xx.xx.xx.xx:42535]
2021-10-21 09:52:57,313 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: node2.hadoop/xx.xx.xx.xx:42535
2021-10-21 09:52:57,352 INFO  org.apache.flink.yarn.YarnResourceManagerDriver   
   [] - Recovered 0 containers from previous attempts ([]).
2021-10-21 09:52:57,353 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Recovered 0 workers from previous attempt.
2021-10-21 09:52:57,379 INFO  org.apache.hadoop.conf.Configuration  
   [] - resource-types.xml not found
2021-10-21 09:52:57,379 INFO  
org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to 
find 'resource-types.xml'.
2021-10-21 09:52:57,394 INFO  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled 
external resources: []
2021-10-21 09:52:57,395 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: node2.hadoop/xx.xx.xx.xx:42535
2021-10-21 09:52:57,396 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink@node2.hadoop:42535] has failed, address is now gated for [50] 
ms. Reason: [Association failed with [akka.tcp://flink@node2.hadoop:42535]] 
Caused by: [java.net.ConnectException: Connection refused: 
node2.hadoop/xx.xx.xx.xx:42535]
2021-10-21 09:52:57,400 INFO  
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper bound 
of the thread pool size is 500
2021-10-21 09:52:57,403 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}.
2021-10-21 09:52:57,407 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
ResourceManager akka.tcp://flink@node3.hadoop:43978/user/rpc/resourcemanager_0 
was granted leadership with fencing token ae27185bb3fda634d2c109510ab54ba4







Re: flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-19 文章 Lei Wang
确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。
程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G


On Mon, Oct 18, 2021 at 10:44 AM Michael Ran  wrote:

> 应该和OSS没关系吧,毕竟只是个存储。
> 我们CPU 你先看看消耗在哪个线程或者方法类呗
>
>
>
> 在 2021-10-08 16:34:47,"Lei Wang"  写道:
>
>
>
> flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
> 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。
>
>
>
>
>
>
> 这个可能的原因是什么?会跟 OSS 有关吗?
>
>
> 谢谢,
> 王磊


Re:flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-17 文章 Michael Ran
应该和OSS没关系吧,毕竟只是个存储。
我们CPU 你先看看消耗在哪个线程或者方法类呗



在 2021-10-08 16:34:47,"Lei Wang"  写道:



flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。





 
这个可能的原因是什么?会跟 OSS 有关吗? 


谢谢,
王磊

Re:​异步IO算子无法完成checkpoint

2021-10-11 文章 李一飞
图片上传到附件中了
















在 2021-10-12 10:33:12,"李一飞"  写道:

异步IO算子无法完成checkpoint,帮忙看下是什么原因  




 

Re: ​异步IO算子无法完成checkpoint

2021-10-11 文章 Caizhi Weng
Hi!

图片无法在邮件中显示,请检查。

李一飞  于2021年10月12日周二 上午10:33写道:

> 异步IO算子无法完成checkpoint,帮忙看下是什么原因
>
>
>
>


​异步IO算子无法完成checkpoint

2021-10-11 文章 李一飞
异步IO算子无法完成checkpoint,帮忙看下是什么原因  

flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-08 文章 Lei Wang
flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。

[image: image.png]

这个可能的原因是什么?会跟 OSS 有关吗?

谢谢,
王磊


Re: HOP窗口较短导致checkpoint失败

2021-09-21 文章 xiaohui zhang
checkpoint的状态大约只有50M左右就会开始出现cp失败的问题。如果失败了,尝试停止任务生成savepoint基本也不能成功。但同时运行的其他任务,cp在300M左右,
save point 1G左右的就很顺利,基本不会出问题。
因为实际的数据压力并不是很大,如果单纯增加并行度,是否能在窗口多的情况下有比较明显的改善呢?

Caizhi Weng  于2021年9月22日周三 上午11:27写道:

> Hi!
>
> 24 小时且步长 1 分钟的 window 会由于数据不断累积而导致 cp 越来越大,越来越慢,最终超时。当然如果运算太慢导致 cp 被 back
> pressure 也有可能导致 cp 超时。开启 mini batch 可以加快 window 的运算速度,但这么长时间而且这么频繁的 window
> 目前确实没有什么很好的优化方法,仍然建议扩大并发以分担计算以及 cp 的压力。
>
> xiaohui zhang  于2021年9月18日周六 上午9:54写道:
>
> > FLink:1.12.1
> >
> > 源: kafka
> > create table dev_log (
> > devid,
> > ip,
> > op_ts
> > ) with (
> > connector = kafka
> > )
> >
> > sink: Hbase connect 2.2
> >
> > 目前用flink sql的hop
> > window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
> > 执行SQL如下
> > insert into h_table
> > select
> >   devid as rowkey
> >   row(hop_end, ip_cnt)
> > from (
> >   select
> >  devid,
> >  hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end,
> >  count(distinct(ip)) as ip_cnt
> > from
> >   dev_logs
> >     group by
> >hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR),
> >   devid
> > )
> >
> > 测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。
> > 实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。
> > 修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。
> > 请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢
> >
>


Re: HOP窗口较短导致checkpoint失败

2021-09-21 文章 Caizhi Weng
Hi!

24 小时且步长 1 分钟的 window 会由于数据不断累积而导致 cp 越来越大,越来越慢,最终超时。当然如果运算太慢导致 cp 被 back
pressure 也有可能导致 cp 超时。开启 mini batch 可以加快 window 的运算速度,但这么长时间而且这么频繁的 window
目前确实没有什么很好的优化方法,仍然建议扩大并发以分担计算以及 cp 的压力。

xiaohui zhang  于2021年9月18日周六 上午9:54写道:

> FLink:1.12.1
>
> 源: kafka
> create table dev_log (
> devid,
> ip,
> op_ts
> ) with (
> connector = kafka
> )
>
> sink: Hbase connect 2.2
>
> 目前用flink sql的hop
> window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
> 执行SQL如下
> insert into h_table
> select
>   devid as rowkey
>   row(hop_end, ip_cnt)
> from (
>   select
>  devid,
>  hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end,
>  count(distinct(ip)) as ip_cnt
> from
>   dev_logs
> group by
>hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR),
>   devid
> )
>
> 测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。
> 实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。
> 修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。
> 请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢
>


HOP窗口较短导致checkpoint失败

2021-09-17 文章 xiaohui zhang
FLink:1.12.1

源: kafka
create table dev_log (
devid,
ip,
op_ts
) with (
connector = kafka
)

sink: Hbase connect 2.2

目前用flink sql的hop window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
执行SQL如下
insert into h_table
select
  devid as rowkey
  row(hop_end, ip_cnt)
from (
  select
 devid,
 hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end,
 count(distinct(ip)) as ip_cnt
from
  dev_logs
group by
   hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR),
  devid
)

测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。
实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。
修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。
请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢


Re: Flink Stream + StreamTableEnvironment 结合使用时checkpoint异常问题

2021-09-12 文章 Tony Wei
Hi

從代碼上來看是使用了 regular join 關聯了 kafka source 和 hbase source,hbase connector
目前是不支持流式數據源的
你可以從任務儀表板上確認下提交的任務,hbase source 的部分應該在執行一段時間後狀態會變更為 FINISHED,目前 flink
checkpoint 還不支持在 FINISHED task 上執行

你可以考慮改寫 sql 使用 processing time temporal join [1] 的方式來關聯 hbase table,從 kafka
消費的數據會實時的去查 hbase table 的當前數據做關聯。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#processing-time-temporal-join

chang li  於 2021年9月10日 週五 下午7:40寫道:

> 没有开启Checkpoint
> execEnv.enableCheckpointing(checkpointInterval);
>
> On 2021/09/10 07:41:10, "xia_...@163.com"  wrote:
> > Hi:
> > 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
> >
> > DataStream kafkaSource = env.addSource(source);
> > Map> sideOutStreamMap = new HashMap<>();
> > for (RowToColumnBean bean : lists) {
> > OutputTag app = new
> OutputTag(bean.getMainTable()) {
> > };
> > sideOutStreamMap.put(bean.getMainTable(), app);
> > }
> >
> > RowToNumberProcessFunction rowToNumberProcessFunction = new
> RowToNumberProcessFunction(sideOutStreamMap, lists);
> > SingleOutputStreamOperator process =
> kafkaSource.process(rowToNumberProcessFunction);
> >
> > EnvironmentSettings settings = EnvironmentSettings.newInstance()
> > .useBlinkPlanner()
> > .inStreamingMode()
> > .build();
> >
> > StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env,
> settings, new TableConfig());
> > //设置checkpoint
> >
> tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
> "10 s");
> >
> > for (RowToColumnBean bean : lists) {
> > DataStream dataStream =
> process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));
> >
> > String mainTable = bean.getMainTable().split("
> ")[0].split("\\.")[1].toLowerCase();
> >
> > //Table tmpTable = tableEnv.fromDataStream(dataStream,
> StrUtil.list2Str(bean.getQueryColumns()));
> >
> > tableEnv.createTemporaryView(mainTable, dataStream);
> >
> > String joinTable = mainTable + "_join";
> > tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
> > "rowkey STRING,\n" +
> > "info ROW,\n" +
> > "PRIMARY KEY (rowkey) NOT ENFORCED\n" +
> > ") WITH (\n" +
> > "'connector' = 'hbase-2.2',\n" +
> > "'table-name' =
> 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" +
> > "'zookeeper.quorum' = '192.168.0.115:2181',\n" +
> > "'zookeeper.znode.parent' = '/hbase'\n" +
> > ")");
> >
> >
> > //查询数据
> > //Table table = tableEnv.sqlQuery("select b.* from tmp a left join
> dformfiled b on a.key = b.rowkey");
> > Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable +
> " a left join " + joinTable + " b on a.key = lower(b.rowkey) and
> b.formid='550' where b.rowkey is not null");
> >
> > TableSchema schema = table.getSchema();
> > schema.getTableColumns().forEach(column -> {
> >
> > System.err.println(column.asSummaryString());
> > });
> >
> > DataStream> tuple2DataStream =
> tableEnv.toRetractStream(table, Row.class);
> > tuple2DataStream.print(mainTable);
> > dataStream.print(mainTable);
> > }
> >
> >
> > xia_...@163.com
> >
>


Re: Flink Stream + StreamTableEnvironment 结合使用时checkpoint异常问题

2021-09-10 文章 chang li
没有开启checkpoint
execEnv.enableCheckpointing(checkpointInterval);

On 2021/09/10 07:41:10, "xia_...@163.com"  wrote: 
> Hi:
> 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
> 
> DataStream kafkaSource = env.addSource(source);
> Map> sideOutStreamMap = new HashMap<>();
> for (RowToColumnBean bean : lists) {
> OutputTag app = new OutputTag(bean.getMainTable()) {
> };
> sideOutStreamMap.put(bean.getMainTable(), app);
> }
> 
> RowToNumberProcessFunction rowToNumberProcessFunction = new 
> RowToNumberProcessFunction(sideOutStreamMap, lists);
> SingleOutputStreamOperator process = 
> kafkaSource.process(rowToNumberProcessFunction);
> 
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> 
> StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, 
> settings, new TableConfig());
> //设置checkpoint
> tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
>  "10 s");
> 
> for (RowToColumnBean bean : lists) {
> DataStream dataStream = 
> process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));
> 
> String mainTable = bean.getMainTable().split(" 
> ")[0].split("\\.")[1].toLowerCase();
> 
> //Table tmpTable = tableEnv.fromDataStream(dataStream, 
> StrUtil.list2Str(bean.getQueryColumns()));
> 
> tableEnv.createTemporaryView(mainTable, dataStream);
> 
> String joinTable = mainTable + "_join";
> tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
> "rowkey STRING,\n" +
> "info ROW,\n" +
> "PRIMARY KEY (rowkey) NOT ENFORCED\n" +
> ") WITH (\n" +
> "'connector' = 'hbase-2.2',\n" +
> "'table-name' = 
> 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" +
> "'zookeeper.quorum' = '192.168.0.115:2181',\n" +
> "'zookeeper.znode.parent' = '/hbase'\n" +
> ")");
> 
> 
> //查询数据
> //Table table = tableEnv.sqlQuery("select b.* from tmp a left join  
> dformfiled b on a.key = b.rowkey");
> Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a 
> left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' 
> where b.rowkey is not null");
> 
> TableSchema schema = table.getSchema();
> schema.getTableColumns().forEach(column -> {
> 
> System.err.println(column.asSummaryString());
> });
> 
> DataStream> tuple2DataStream = 
> tableEnv.toRetractStream(table, Row.class);
> tuple2DataStream.print(mainTable);
> dataStream.print(mainTable);
> }
> 
> 
> xia_...@163.com
> 


Re: Flink Stream + StreamTableEnvironment 结合使用时checkpoint异常问题

2021-09-10 文章 chang li
没有开启checkpoint
execEnv.enableCheckpointing(checkpointInterval);

On 2021/09/10 07:41:10, "xia_...@163.com"  wrote: 
> Hi:
> 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
> 
> DataStream kafkaSource = env.addSource(source);
> Map> sideOutStreamMap = new HashMap<>();
> for (RowToColumnBean bean : lists) {
> OutputTag app = new OutputTag(bean.getMainTable()) {
> };
> sideOutStreamMap.put(bean.getMainTable(), app);
> }
> 
> RowToNumberProcessFunction rowToNumberProcessFunction = new 
> RowToNumberProcessFunction(sideOutStreamMap, lists);
> SingleOutputStreamOperator process = 
> kafkaSource.process(rowToNumberProcessFunction);
> 
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> 
> StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, 
> settings, new TableConfig());
> //设置checkpoint
> tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
>  "10 s");
> 
> for (RowToColumnBean bean : lists) {
> DataStream dataStream = 
> process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));
> 
> String mainTable = bean.getMainTable().split(" 
> ")[0].split("\\.")[1].toLowerCase();
> 
> //Table tmpTable = tableEnv.fromDataStream(dataStream, 
> StrUtil.list2Str(bean.getQueryColumns()));
> 
> tableEnv.createTemporaryView(mainTable, dataStream);
> 
> String joinTable = mainTable + "_join";
> tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
> "rowkey STRING,\n" +
> "info ROW,\n" +
> "PRIMARY KEY (rowkey) NOT ENFORCED\n" +
> ") WITH (\n" +
> "'connector' = 'hbase-2.2',\n" +
> "'table-name' = 
> 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" +
> "'zookeeper.quorum' = '192.168.0.115:2181',\n" +
> "'zookeeper.znode.parent' = '/hbase'\n" +
> ")");
> 
> 
> //查询数据
> //Table table = tableEnv.sqlQuery("select b.* from tmp a left join  
> dformfiled b on a.key = b.rowkey");
> Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a 
> left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' 
> where b.rowkey is not null");
> 
> TableSchema schema = table.getSchema();
> schema.getTableColumns().forEach(column -> {
> 
> System.err.println(column.asSummaryString());
> });
> 
> DataStream> tuple2DataStream = 
> tableEnv.toRetractStream(table, Row.class);
> tuple2DataStream.print(mainTable);
> dataStream.print(mainTable);
> }
> 
> 
> xia_...@163.com
> 


Re: Flink Stream + StreamTableEnvironment 结合使用时checkpoint异常问题

2021-09-10 文章 chang li
没有开启checkpoint
execEnv.enableCheckpointing(checkpointInterval);

On 2021/09/10 07:41:10, "xia_...@163.com"  wrote: 
> Hi:
> 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
> 
> DataStream kafkaSource = env.addSource(source);
> Map> sideOutStreamMap = new HashMap<>();
> for (RowToColumnBean bean : lists) {
> OutputTag app = new OutputTag(bean.getMainTable()) {
> };
> sideOutStreamMap.put(bean.getMainTable(), app);
> }
> 
> RowToNumberProcessFunction rowToNumberProcessFunction = new 
> RowToNumberProcessFunction(sideOutStreamMap, lists);
> SingleOutputStreamOperator process = 
> kafkaSource.process(rowToNumberProcessFunction);
> 
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> 
> StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, 
> settings, new TableConfig());
> //设置checkpoint
> tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
>  "10 s");
> 
> for (RowToColumnBean bean : lists) {
> DataStream dataStream = 
> process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));
> 
> String mainTable = bean.getMainTable().split(" 
> ")[0].split("\\.")[1].toLowerCase();
> 
> //Table tmpTable = tableEnv.fromDataStream(dataStream, 
> StrUtil.list2Str(bean.getQueryColumns()));
> 
> tableEnv.createTemporaryView(mainTable, dataStream);
> 
> String joinTable = mainTable + "_join";
> tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
> "rowkey STRING,\n" +
> "info ROW,\n" +
> "PRIMARY KEY (rowkey) NOT ENFORCED\n" +
> ") WITH (\n" +
> "'connector' = 'hbase-2.2',\n" +
> "'table-name' = 
> 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" +
> "'zookeeper.quorum' = '192.168.0.115:2181',\n" +
> "'zookeeper.znode.parent' = '/hbase'\n" +
> ")");
> 
> 
> //查询数据
> //Table table = tableEnv.sqlQuery("select b.* from tmp a left join  
> dformfiled b on a.key = b.rowkey");
> Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a 
> left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' 
> where b.rowkey is not null");
> 
> TableSchema schema = table.getSchema();
> schema.getTableColumns().forEach(column -> {
> 
> System.err.println(column.asSummaryString());
> });
> 
> DataStream> tuple2DataStream = 
> tableEnv.toRetractStream(table, Row.class);
> tuple2DataStream.print(mainTable);
> dataStream.print(mainTable);
> }
> 
> 
> xia_...@163.com
> 


Re: Flink Stream + StreamTableEnvironment 结合使用时checkpoint异常问题

2021-09-10 文章 chang li
没有开启checkpoint
execEnv.enableCheckpointing(checkpointInterval);

On 2021/09/10 07:41:10, "xia_...@163.com"  wrote: 
> Hi:
> 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
> 
> DataStream kafkaSource = env.addSource(source);
> Map> sideOutStreamMap = new HashMap<>();
> for (RowToColumnBean bean : lists) {
> OutputTag app = new OutputTag(bean.getMainTable()) {
> };
> sideOutStreamMap.put(bean.getMainTable(), app);
> }
> 
> RowToNumberProcessFunction rowToNumberProcessFunction = new 
> RowToNumberProcessFunction(sideOutStreamMap, lists);
> SingleOutputStreamOperator process = 
> kafkaSource.process(rowToNumberProcessFunction);
> 
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> 
> StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, 
> settings, new TableConfig());
> //设置checkpoint
> tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
>  "10 s");
> 
> for (RowToColumnBean bean : lists) {
> DataStream dataStream = 
> process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));
> 
> String mainTable = bean.getMainTable().split(" 
> ")[0].split("\\.")[1].toLowerCase();
> 
> //Table tmpTable = tableEnv.fromDataStream(dataStream, 
> StrUtil.list2Str(bean.getQueryColumns()));
> 
> tableEnv.createTemporaryView(mainTable, dataStream);
> 
> String joinTable = mainTable + "_join";
> tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
> "rowkey STRING,\n" +
> "info ROW,\n" +
> "PRIMARY KEY (rowkey) NOT ENFORCED\n" +
> ") WITH (\n" +
> "'connector' = 'hbase-2.2',\n" +
> "'table-name' = 
> 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" +
> "'zookeeper.quorum' = '192.168.0.115:2181',\n" +
> "'zookeeper.znode.parent' = '/hbase'\n" +
> ")");
> 
> 
> //查询数据
> //Table table = tableEnv.sqlQuery("select b.* from tmp a left join  
> dformfiled b on a.key = b.rowkey");
> Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a 
> left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' 
> where b.rowkey is not null");
> 
> TableSchema schema = table.getSchema();
> schema.getTableColumns().forEach(column -> {
> 
> System.err.println(column.asSummaryString());
> });
> 
> DataStream> tuple2DataStream = 
> tableEnv.toRetractStream(table, Row.class);
> tuple2DataStream.print(mainTable);
> dataStream.print(mainTable);
> }
> 
> 
> xia_...@163.com
> 


Re: Flink Stream + StreamTableEnvironment 结合使用时checkpoint异常问题

2021-09-10 文章 chang li
没有开启Checkpoint
execEnv.enableCheckpointing(checkpointInterval);

On 2021/09/10 07:41:10, "xia_...@163.com"  wrote: 
> Hi:
> 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
> 
> DataStream kafkaSource = env.addSource(source);
> Map> sideOutStreamMap = new HashMap<>();
> for (RowToColumnBean bean : lists) {
> OutputTag app = new OutputTag(bean.getMainTable()) {
> };
> sideOutStreamMap.put(bean.getMainTable(), app);
> }
> 
> RowToNumberProcessFunction rowToNumberProcessFunction = new 
> RowToNumberProcessFunction(sideOutStreamMap, lists);
> SingleOutputStreamOperator process = 
> kafkaSource.process(rowToNumberProcessFunction);
> 
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> 
> StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, 
> settings, new TableConfig());
> //设置checkpoint
> tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
>  "10 s");
> 
> for (RowToColumnBean bean : lists) {
> DataStream dataStream = 
> process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));
> 
> String mainTable = bean.getMainTable().split(" 
> ")[0].split("\\.")[1].toLowerCase();
> 
> //Table tmpTable = tableEnv.fromDataStream(dataStream, 
> StrUtil.list2Str(bean.getQueryColumns()));
> 
> tableEnv.createTemporaryView(mainTable, dataStream);
> 
> String joinTable = mainTable + "_join";
> tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
> "rowkey STRING,\n" +
> "info ROW,\n" +
> "PRIMARY KEY (rowkey) NOT ENFORCED\n" +
> ") WITH (\n" +
> "'connector' = 'hbase-2.2',\n" +
> "'table-name' = 
> 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" +
> "'zookeeper.quorum' = '192.168.0.115:2181',\n" +
> "'zookeeper.znode.parent' = '/hbase'\n" +
> ")");
> 
> 
> //查询数据
> //Table table = tableEnv.sqlQuery("select b.* from tmp a left join  
> dformfiled b on a.key = b.rowkey");
> Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a 
> left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' 
> where b.rowkey is not null");
> 
> TableSchema schema = table.getSchema();
> schema.getTableColumns().forEach(column -> {
> 
> System.err.println(column.asSummaryString());
> });
> 
> DataStream> tuple2DataStream = 
> tableEnv.toRetractStream(table, Row.class);
> tuple2DataStream.print(mainTable);
> dataStream.print(mainTable);
> }
> 
> 
> xia_...@163.com
> 


Flink Stream + StreamTableEnvironment 结合使用时checkpoint异常问题

2021-09-10 文章 xia_...@163.com
Hi:
有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 
消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下

DataStream kafkaSource = env.addSource(source);
Map> sideOutStreamMap = new HashMap<>();
for (RowToColumnBean bean : lists) {
OutputTag app = new OutputTag(bean.getMainTable()) {
};
sideOutStreamMap.put(bean.getMainTable(), app);
}

RowToNumberProcessFunction rowToNumberProcessFunction = new 
RowToNumberProcessFunction(sideOutStreamMap, lists);
SingleOutputStreamOperator process = 
kafkaSource.process(rowToNumberProcessFunction);

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, 
settings, new TableConfig());
//设置checkpoint
tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
 "10 s");

for (RowToColumnBean bean : lists) {
DataStream dataStream = 
process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));

String mainTable = bean.getMainTable().split(" 
")[0].split("\\.")[1].toLowerCase();

//Table tmpTable = tableEnv.fromDataStream(dataStream, 
StrUtil.list2Str(bean.getQueryColumns()));

tableEnv.createTemporaryView(mainTable, dataStream);

String joinTable = mainTable + "_join";
tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
"rowkey STRING,\n" +
"info ROW,\n" +
"PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
"'connector' = 'hbase-2.2',\n" +
"'table-name' = 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" 
+
"'zookeeper.quorum' = '192.168.0.115:2181',\n" +
"'zookeeper.znode.parent' = '/hbase'\n" +
")");


//查询数据
//Table table = tableEnv.sqlQuery("select b.* from tmp a left join  
dformfiled b on a.key = b.rowkey");
Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a 
left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' 
where b.rowkey is not null");

TableSchema schema = table.getSchema();
schema.getTableColumns().forEach(column -> {

System.err.println(column.asSummaryString());
});

DataStream> tuple2DataStream = 
tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print(mainTable);
dataStream.print(mainTable);
}


xia_...@163.com


Re: SQL clinet端,checkpoint 恢复执行

2021-09-08 文章 Caizhi Weng
Hi!

execution.checkpoint.path 这个参数不存在,只有
execution.savepoint.path。另外是通过什么方式设置参数的呢?是写在 flink-conf.yaml 里吗?还是通过 SET 语句?

outlook_3e5704ab57282...@outlook.com 
于2021年9月9日周四 上午9:47写道:

>
> 您好:
>
>版本:flink1.13
> 运行: flink sql
> 源表: 基于kafka建表
> 处理逻辑:简单过滤输出到hive
>
>     问题:
>
> 每1分钟做一次checkpoint,然后由于某原因需要取消任务,再次运行job如何继续从最新的checkpoint继续处理,在客户端设置了参数:
>
> execution.savepoint.path=hdfs:///user/flink/checkpoints/68e103c6ec9d8bfe459cb6c329ec696c
>
> execution.checkpoint.path=hdfs:///user/flink/checkpoints/68e103c6ec9d8bfe459cb6c329ec696c
>  重新 运行后依然是从开始消费处理,未从checkpoint恢复执行
>
> 从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送
>
>


SQL;clinet端;checkpoint;恢复执行;

2021-09-08 文章 刘保欣
您好:
 
 
 
版本:flink1.13 
 
运行: flink sql 
 
源表: 基于kafka建表
 
 处理逻辑:简单过滤输出到hive
 

 
 问题: 
 
每1分钟做一次checkpoint,然后由于某原因需要取消任务,再次运行job如何继续从最新的checkpoint继续处理,在客户端设置了参数:
 
 
execution.savepoint.path=hdfs:///user/flink/checkpoints/68e103c6ec9d8bfe459cb6c329ec696c
 
 
execution.checkpoint.path=hdfs:///user/flink/checkpoints/68e103c6ec9d8bfe459cb6c329ec696c
 
 重新 运行后依然是从开始消费处理,未从checkpoint恢复执行

  1   2   3   4   5   6   7   >