Re: flink-checkpoint 问题

2024-01-11 Thread Zakelly Lan
看了下代码,这个问题有可能的原因是:
1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log
的,所以有概率是目录创建了,但是log没输出trigger
2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能是先执行了创建25548目录的操作然后作业再失败,然后trigger
25548还没输出就退了。

版本1.14.5之后代码已经把上述1行为改了,先打log再创建目录,就不会有这样奇怪的问题了。



On Thu, Jan 11, 2024 at 3:03 PM 吴先生 <15951914...@163.com> wrote:

> TM日志:
> 2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
> and sending final execution state CANCELED to JobManager for task
> ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0
> e960208bbd95b1b219bafe4887b48392.
> 2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR
> o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered
> error while consuming partitions
> java.nio.channels.ClosedChannelException: null
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
>  at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  at java.lang.Thread.run(Thread.java:748)
>
>
> JM日志,没有25548的触发记录:
> 2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (

回复: flink-checkpoint 问题

2024-01-11 Thread 吴先生
看现象是这样,谢了,我抽空看下这块源码


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月11日 16:33 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
看了下代码,这个问题有可能的原因是:
1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log
的,所以有概率是目录创建了,但是log没输出trigger
2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能是先执行了创建25548目录的操作然后作业再失败,然后trigger
25548还没输出就退了。

版本1.14.5之后代码已经把上述1行为改了,先打log再创建目录,就不会有这样奇怪的问题了。



On Thu, Jan 11, 2024 at 3:03 PM 吴先生 <15951914...@163.com> wrote:

TM日志:
2023-12-31 18:50:11.180 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
and sending final execution state CANCELED to JobManager for task
ChargeRangeBroadcastFunction -> Timestamps/Watermarks (4/6)#0
e960208bbd95b1b219bafe4887b48392.
2023-12-31 18:50:11.232 [Flink Netty Server (288) Thread 0] ERROR
o.a.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered
error while consuming partitions
java.nio.channels.ClosedChannelException: null
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:606)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.close(AbstractChannel.java:232)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestQueue.close(PartitionRequestQueue.java:134)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:160)
at org.apache.flink.runtime.io
.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:47)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)


JM日志,没有25548的触发记录:
2023-12-31 18:39:10.664 [jobmanager-future-thread-20] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347921 bytes

flinksql以时间函数过滤数据场景求助

2024-01-11 Thread 张河川


flink版本1.18
场景如下:
A表字段:
id,update_time(date格式)
一条数据:
1,2023-01-12



现在我需要保留update_time+1年,大于当前日。


简单地写一个sql:
select 
id,update_time
from A
where TIMESTAMPADD(YEAR,1,update_time) > CURRENT_DATE;


结果:
在2024年1月11日这一天,where条件达成,这条数据不会被过滤掉;
在2024年1月12日,sql并不会触发计算来过滤掉此条数据。


在真实的场景中,update_time跨度很多年,且部分数据需要+1年,部分数据不用加,判断条件精确到日期,还有部分数据需要+3年,判断条件只用精确到年。
我该怎么做才能实时地根据CURRENT_DATE来触发过滤的计算呢?


辛苦各位大佬。
| |
张河川
|
|
milesian...@163.com
|