Re:Re: 关于streamFileSink在checkpoint下生成文件问题

2021-12-01 文章
|




32684
|
COMPLETED
| 8/8 | 13:52:36 | 13:52:38 | 2s | 126 KB | 0 B |
| | 32683 |
COMPLETED
| 8/8 | 13:42:36 | 13:42:39 | 2s | 126 KB | 0 B |
| | 32682 |
COMPLETED
| 8/8 | 13:32:36 | 13:32:39 | 2s | 126 KB | 0 B |
| | 32681 |
COMPLETED
| 8/8 | 13:22:36 | 13:22:39 | 2s | 125 KB | 0 B |
| | 32680 |
COMPLETED
| 8/8 | 13:12:36 | 13:12:39 | 2s | 125 KB | 0 B |
| | 32679 |
COMPLETED
| 8/8 | 13:02:36 | 13:02:41 | 4s | 214 KB | 0 B |
上图是checkpoint


这个是在11月30号0时段生成的文件
2021-11-30 00:00:011080827 athena_other-0-217891.gz
2021-11-30 00:02:424309209 athena_other-0-217892.gz
2021-11-30 00:12:403902474 athena_other-0-217893.gz
2021-11-30 00:22:403886322 athena_other-0-217894.gz
2021-11-30 00:32:403988037 athena_other-0-217895.gz
2021-11-30 00:42:403892343 athena_other-0-217896.gz
2021-11-30 00:52:392972183 athena_other-0-217897.gz
2021-11-30 00:00:011125774 athena_other-1-219679.gz
2021-11-30 00:02:424338748 athena_other-1-219680.gz
2021-11-30 00:12:404204571 athena_other-1-219681.gz
2021-11-30 00:22:403852791 athena_other-1-219682.gz
2021-11-30 00:32:404025214 athena_other-1-219683.gz
2021-11-30 00:42:404205107 athena_other-1-219684.gz
2021-11-30 00:52:392922192 athena_other-1-219685.gz
2021-11-30 00:00:011103734 athena_other-2-220084.gz


这个是1点生成的文件
2021-11-30 01:00:011228793 athena_other-0-217951.gz
2021-11-30 01:02:424243566 athena_other-0-217952.gz
2021-11-30 01:12:404106305 athena_other-0-217953.gz
2021-11-30 01:22:404456214 athena_other-0-217954.gz
2021-11-30 01:32:414303156 athena_other-0-217955.gz
2021-11-30 01:42:404688872 athena_other-0-217956.gz
2021-11-30 01:52:403251910 athena_other-0-217957.gz
2021-11-30 01:00:011163354 athena_other-1-219736.gz
2021-11-30 01:02:424405233 athena_other-1-219737.gz
2021-11-30 01:12:404094502 athena_other-1-219738.gz
2021-11-30 01:22:404395071 athena_other-1-219739.gz
2021-11-30 01:32:404205169 athena_other-1-219740.gz
2021-11-30 01:42:404432610 athena_other-1-219741.gz
2021-11-30 01:52:403224111 athena_other-1-219742.gz
2021-11-30 01:00:011163964 athena_other-2-220137.gz




之前的截图无法发送,我把文件贴出来,打扰了







在 2021-12-02 13:52:43,"黄志高"  写道:




Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy














在 2021-12-02 11:37:31,"Caizhi Weng"  写道:
>Hi!
>
>邮件里看不到图片和附件,建议使用外部图床。
>
>partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
>exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。
>
>黄志高  于2021年12月1日周三 下午9:53写道:
>
>> hi,各位大佬,咨询个问题
>>
>>  
>> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>>
>>
>>
>>





 

Re: 关于streamFileSink在checkpoint下生成文件问题

2021-12-01 文章



Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy














在 2021-12-02 11:37:31,"Caizhi Weng"  写道:
>Hi!
>
>邮件里看不到图片和附件,建议使用外部图床。
>
>partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
>exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。
>
>黄志高  于2021年12月1日周三 下午9:53写道:
>
>> hi,各位大佬,咨询个问题
>>
>>  
>> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>>
>>
>>
>>


关于streamFileSink在checkpoint下生成文件问题

2021-12-01 文章
hi,各位大佬,咨询个问题
   
我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看

Re:Re: Re: 关于flink sql自定义udf,eval方法被调用两次问题

2021-08-15 文章






感谢提供帮助











在 2021-08-16 11:31:11,"Qishang"  写道:
>Hi
>之前社区发过一个 JD 的解决方案,可以参考下[1]。
>
>[1]: https://mp.weixin.qq.com/s/YluIj3vmebFmZjRbymKBZw
>
>
>
>黄志高  于2021年8月16日周一 上午11:04写道:
>
>> == Physical Execution Plan ==
>>
>> Stage 1 : Data Source
>>
>> content : Source: TableSourceScan(table=[[default_catalog,
>> default_database, test_kafka]], fields=[tz])
>>
>>
>>
>>
>> Stage 2 : Operator
>>
>> content : Calc(select=[tt1(tz) AS tz], where=[tt1(tz) IS NOT NULL])
>>
>> ship_strategy : FORWARD
>>
>>
>>
>> 从执行计划中看出,在select与where中这个tt1(tz)的udf确实调用了两次,看issuse,目前还没有被分配,是否有什么办法可以规避
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2021-08-16 09:47:28,"Qishang"  写道:
>> >Hi.
>> >
>> >应该是这个问题,https://issues.apache.org/jira/browse/FLINK-21573
>> >
>> >打印一下执行计划和code gen
>> >
>> >
>> >黄志高  于2021年8月15日周日 下午10:06写道:
>> >
>> >> hi all,
>> >> 各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from
>> >> test_kafka) as t where tz is not null
>> >> 时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select tt1(tz) from
>> >> test_kafka) as t这个sql时,不进行where tz is not null
>> 操作,eval方法此时只会调用一次,如果将where
>> >> tz is not null 改成 where tz ='某一个具体值'此时eval方法也只会调用一次,一开始我以为是is not null
>> >> 问题,我进行重写is not null方法,发现eval方法还是调用两次,不过此时发现eval方法是在select tt1(tz) from
>> >> test_kafka这个阶段发生一次,第二次是在where tz is not
>> >>
>> null执行后发生,虽然最后输出的结果不是双份,但是我认为eval方法执行两次,对效率有一定的影响,以下附件是我的代码截图,望各位大佬帮忙分析下,flink版本1.12
>> >>
>> >>
>> >>
>> >>
>> >>
>>


Re:Re: 关于flink sql自定义udf,eval方法被调用两次问题

2021-08-15 文章
== Physical Execution Plan ==

Stage 1 : Data Source

content : Source: TableSourceScan(table=[[default_catalog, default_database, 
test_kafka]], fields=[tz])




Stage 2 : Operator

content : Calc(select=[tt1(tz) AS tz], where=[tt1(tz) IS NOT NULL])

ship_strategy : FORWARD



从执行计划中看出,在select与where中这个tt1(tz)的udf确实调用了两次,看issuse,目前还没有被分配,是否有什么办法可以规避

















在 2021-08-16 09:47:28,"Qishang"  写道:
>Hi.
>
>应该是这个问题,https://issues.apache.org/jira/browse/FLINK-21573
>
>打印一下执行计划和code gen
>
>
>黄志高  于2021年8月15日周日 下午10:06写道:
>
>> hi all,
>> 各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from
>> test_kafka) as t where tz is not null
>> 时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select tt1(tz) from
>> test_kafka) as t这个sql时,不进行where tz is not null 操作,eval方法此时只会调用一次,如果将where
>> tz is not null 改成 where tz ='某一个具体值'此时eval方法也只会调用一次,一开始我以为是is not null
>> 问题,我进行重写is not null方法,发现eval方法还是调用两次,不过此时发现eval方法是在select tt1(tz) from
>> test_kafka这个阶段发生一次,第二次是在where tz is not
>> null执行后发生,虽然最后输出的结果不是双份,但是我认为eval方法执行两次,对效率有一定的影响,以下附件是我的代码截图,望各位大佬帮忙分析下,flink版本1.12
>>
>>
>>
>>
>>


关于flink sql自定义udf,eval方法被调用两次问题

2021-08-15 文章
hi all,
各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from 
test_kafka) as t where tz is not null 
时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select tt1(tz) from 
test_kafka) as t这个sql时,不进行where tz is not null 操作,eval方法此时只会调用一次,如果将where tz is 
not null 改成 where tz ='某一个具体值'此时eval方法也只会调用一次,一开始我以为是is not null 问题,我进行重写is not 
null方法,发现eval方法还是调用两次,不过此时发现eval方法是在select tt1(tz) from 
test_kafka这个阶段发生一次,第二次是在where tz is not 
null执行后发生,虽然最后输出的结果不是双份,但是我认为eval方法执行两次,对效率有一定的影响,以下附件是我的代码截图,望各位大佬帮忙分析下,flink版本1.12



Re:Re:关于任务运行一定时间后,physical内存超出,container被kill,导致任务重启

2021-07-07 文章
目前虚拟内存和物理内存比例为5,而且我这是物理内存超出,并非虚拟内存




















在 2021-07-08 10:49:08,"Roc Marshal"  写道:
>Hi, 
>可以先校对一下yarn的container的虚拟内存和物理内存比例的阈值参数(yarn-site.xml)。
>
>
>   
>
>祝好,Roc.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-07-08 10:44:20,"黄志高"  写道:
>>flink环境1.11.0
>>任务部署方式yarn per-job
>>状态后台设置的是:env.setStateBackend(new FsStateBackend("ckPath"))
>>每个taskManager分配8g内存,2个slot
>>每10分钟做一次checkpoint,每次ck大小平均400k
>>任务逻辑是:source(kafka)->keyBy->timeWindow->reduce的count计数->redis
>> source(kafka)->sink(s3 文件)
>>
>>
>>问题是任务每天都会应该container被杀,导致任务重启
>>Container [pid=26148,containerID=container_e02_1622516404559_0038_01_08] 
>>is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB 
>>physical memory used; 9.8 GB of 40 GB virtual memory used. Killing container
>>
>>
>>我的理解是缓存数据应该不会那么多,怎么就能达到物理内存限制呢,我的window操作,理应都是key下对应一个值,key的数据也不多,缓存应该也只记录这个状态,而且window采用的是reduce操作,来一条处理一条,增量处理,而不是processFunction的攒一批处理一次
>>望各位大佬帮忙看看,感谢
>>
>>
>>


关于任务运行一定时间后,physical内存超出,container被kill,导致任务重启

2021-07-07 文章
flink环境1.11.0
任务部署方式yarn per-job
状态后台设置的是:env.setStateBackend(new FsStateBackend("ckPath"))
每个taskManager分配8g内存,2个slot
每10分钟做一次checkpoint,每次ck大小平均400k
任务逻辑是:source(kafka)->keyBy->timeWindow->reduce的count计数->redis
 source(kafka)->sink(s3 文件)


问题是任务每天都会应该container被杀,导致任务重启
Container [pid=26148,containerID=container_e02_1622516404559_0038_01_08] is 
running beyond physical memory limits. Current usage: 8.0 GB of 8 GB physical 
memory used; 9.8 GB of 40 GB virtual memory used. Killing container


我的理解是缓存数据应该不会那么多,怎么就能达到物理内存限制呢,我的window操作,理应都是key下对应一个值,key的数据也不多,缓存应该也只记录这个状态,而且window采用的是reduce操作,来一条处理一条,增量处理,而不是processFunction的攒一批处理一次
望各位大佬帮忙看看,感谢





(无主题)

2021-03-16 文章
各位大佬,在做远程提交任务到flink的standalone模式,抛以下异常
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:277)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:981)
at 
cn.encdata.cloud.dataengine.core.RemoteEnvironment.execute(RemoteEnvironment.java:145)
at 
cn.encdata.cloud.dataengine.core.builder.BatchJobBuilder.execute(BatchJobBuilder.java:89)
... 25 common frames omitted
Caused by: java.lang.InterruptedException: null
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:976)