Re:Re: 关于streamFileSink在checkpoint下生成文件问题
| 32684 | COMPLETED | 8/8 | 13:52:36 | 13:52:38 | 2s | 126 KB | 0 B | | | 32683 | COMPLETED | 8/8 | 13:42:36 | 13:42:39 | 2s | 126 KB | 0 B | | | 32682 | COMPLETED | 8/8 | 13:32:36 | 13:32:39 | 2s | 126 KB | 0 B | | | 32681 | COMPLETED | 8/8 | 13:22:36 | 13:22:39 | 2s | 125 KB | 0 B | | | 32680 | COMPLETED | 8/8 | 13:12:36 | 13:12:39 | 2s | 125 KB | 0 B | | | 32679 | COMPLETED | 8/8 | 13:02:36 | 13:02:41 | 4s | 214 KB | 0 B | 上图是checkpoint 这个是在11月30号0时段生成的文件 2021-11-30 00:00:011080827 athena_other-0-217891.gz 2021-11-30 00:02:424309209 athena_other-0-217892.gz 2021-11-30 00:12:403902474 athena_other-0-217893.gz 2021-11-30 00:22:403886322 athena_other-0-217894.gz 2021-11-30 00:32:403988037 athena_other-0-217895.gz 2021-11-30 00:42:403892343 athena_other-0-217896.gz 2021-11-30 00:52:392972183 athena_other-0-217897.gz 2021-11-30 00:00:011125774 athena_other-1-219679.gz 2021-11-30 00:02:424338748 athena_other-1-219680.gz 2021-11-30 00:12:404204571 athena_other-1-219681.gz 2021-11-30 00:22:403852791 athena_other-1-219682.gz 2021-11-30 00:32:404025214 athena_other-1-219683.gz 2021-11-30 00:42:404205107 athena_other-1-219684.gz 2021-11-30 00:52:392922192 athena_other-1-219685.gz 2021-11-30 00:00:011103734 athena_other-2-220084.gz 这个是1点生成的文件 2021-11-30 01:00:011228793 athena_other-0-217951.gz 2021-11-30 01:02:424243566 athena_other-0-217952.gz 2021-11-30 01:12:404106305 athena_other-0-217953.gz 2021-11-30 01:22:404456214 athena_other-0-217954.gz 2021-11-30 01:32:414303156 athena_other-0-217955.gz 2021-11-30 01:42:404688872 athena_other-0-217956.gz 2021-11-30 01:52:403251910 athena_other-0-217957.gz 2021-11-30 01:00:011163354 athena_other-1-219736.gz 2021-11-30 01:02:424405233 athena_other-1-219737.gz 2021-11-30 01:12:404094502 athena_other-1-219738.gz 2021-11-30 01:22:404395071 athena_other-1-219739.gz 2021-11-30 01:32:404205169 athena_other-1-219740.gz 2021-11-30 01:42:404432610 athena_other-1-219741.gz 2021-11-30 01:52:403224111 athena_other-1-219742.gz 2021-11-30 01:00:011163964 athena_other-2-220137.gz 之前的截图无法发送,我把文件贴出来,打扰了 在 2021-12-02 13:52:43,"黄志高" 写道: Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy 在 2021-12-02 11:37:31,"Caizhi Weng" 写道: >Hi! > >邮件里看不到图片和附件,建议使用外部图床。 > >partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证 >exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。 > >黄志高 于2021年12月1日周三 下午9:53写道: > >> hi,各位大佬,咨询个问题 >> >> >> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看 >> >> >> >>
Re: 关于streamFileSink在checkpoint下生成文件问题
Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy 在 2021-12-02 11:37:31,"Caizhi Weng" 写道: >Hi! > >邮件里看不到图片和附件,建议使用外部图床。 > >partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证 >exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。 > >黄志高 于2021年12月1日周三 下午9:53写道: > >> hi,各位大佬,咨询个问题 >> >> >> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看 >> >> >> >>
关于streamFileSink在checkpoint下生成文件问题
hi,各位大佬,咨询个问题 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
Re:Re: Re: 关于flink sql自定义udf,eval方法被调用两次问题
感谢提供帮助 在 2021-08-16 11:31:11,"Qishang" 写道: >Hi >之前社区发过一个 JD 的解决方案,可以参考下[1]。 > >[1]: https://mp.weixin.qq.com/s/YluIj3vmebFmZjRbymKBZw > > > >黄志高 于2021年8月16日周一 上午11:04写道: > >> == Physical Execution Plan == >> >> Stage 1 : Data Source >> >> content : Source: TableSourceScan(table=[[default_catalog, >> default_database, test_kafka]], fields=[tz]) >> >> >> >> >> Stage 2 : Operator >> >> content : Calc(select=[tt1(tz) AS tz], where=[tt1(tz) IS NOT NULL]) >> >> ship_strategy : FORWARD >> >> >> >> 从执行计划中看出,在select与where中这个tt1(tz)的udf确实调用了两次,看issuse,目前还没有被分配,是否有什么办法可以规避 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2021-08-16 09:47:28,"Qishang" 写道: >> >Hi. >> > >> >应该是这个问题,https://issues.apache.org/jira/browse/FLINK-21573 >> > >> >打印一下执行计划和code gen >> > >> > >> >黄志高 于2021年8月15日周日 下午10:06写道: >> > >> >> hi all, >> >> 各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from >> >> test_kafka) as t where tz is not null >> >> 时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select tt1(tz) from >> >> test_kafka) as t这个sql时,不进行where tz is not null >> 操作,eval方法此时只会调用一次,如果将where >> >> tz is not null 改成 where tz ='某一个具体值'此时eval方法也只会调用一次,一开始我以为是is not null >> >> 问题,我进行重写is not null方法,发现eval方法还是调用两次,不过此时发现eval方法是在select tt1(tz) from >> >> test_kafka这个阶段发生一次,第二次是在where tz is not >> >> >> null执行后发生,虽然最后输出的结果不是双份,但是我认为eval方法执行两次,对效率有一定的影响,以下附件是我的代码截图,望各位大佬帮忙分析下,flink版本1.12 >> >> >> >> >> >> >> >> >> >> >>
Re:Re: 关于flink sql自定义udf,eval方法被调用两次问题
== Physical Execution Plan == Stage 1 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, test_kafka]], fields=[tz]) Stage 2 : Operator content : Calc(select=[tt1(tz) AS tz], where=[tt1(tz) IS NOT NULL]) ship_strategy : FORWARD 从执行计划中看出,在select与where中这个tt1(tz)的udf确实调用了两次,看issuse,目前还没有被分配,是否有什么办法可以规避 在 2021-08-16 09:47:28,"Qishang" 写道: >Hi. > >应该是这个问题,https://issues.apache.org/jira/browse/FLINK-21573 > >打印一下执行计划和code gen > > >黄志高 于2021年8月15日周日 下午10:06写道: > >> hi all, >> 各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from >> test_kafka) as t where tz is not null >> 时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select tt1(tz) from >> test_kafka) as t这个sql时,不进行where tz is not null 操作,eval方法此时只会调用一次,如果将where >> tz is not null 改成 where tz ='某一个具体值'此时eval方法也只会调用一次,一开始我以为是is not null >> 问题,我进行重写is not null方法,发现eval方法还是调用两次,不过此时发现eval方法是在select tt1(tz) from >> test_kafka这个阶段发生一次,第二次是在where tz is not >> null执行后发生,虽然最后输出的结果不是双份,但是我认为eval方法执行两次,对效率有一定的影响,以下附件是我的代码截图,望各位大佬帮忙分析下,flink版本1.12 >> >> >> >> >>
关于flink sql自定义udf,eval方法被调用两次问题
hi all, 各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from test_kafka) as t where tz is not null 时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select tt1(tz) from test_kafka) as t这个sql时,不进行where tz is not null 操作,eval方法此时只会调用一次,如果将where tz is not null 改成 where tz ='某一个具体值'此时eval方法也只会调用一次,一开始我以为是is not null 问题,我进行重写is not null方法,发现eval方法还是调用两次,不过此时发现eval方法是在select tt1(tz) from test_kafka这个阶段发生一次,第二次是在where tz is not null执行后发生,虽然最后输出的结果不是双份,但是我认为eval方法执行两次,对效率有一定的影响,以下附件是我的代码截图,望各位大佬帮忙分析下,flink版本1.12
Re:Re:关于任务运行一定时间后,physical内存超出,container被kill,导致任务重启
目前虚拟内存和物理内存比例为5,而且我这是物理内存超出,并非虚拟内存 在 2021-07-08 10:49:08,"Roc Marshal" 写道: >Hi, >可以先校对一下yarn的container的虚拟内存和物理内存比例的阈值参数(yarn-site.xml)。 > > > > >祝好,Roc. > > > > > > > > > > > > > > >在 2021-07-08 10:44:20,"黄志高" 写道: >>flink环境1.11.0 >>任务部署方式yarn per-job >>状态后台设置的是:env.setStateBackend(new FsStateBackend("ckPath")) >>每个taskManager分配8g内存,2个slot >>每10分钟做一次checkpoint,每次ck大小平均400k >>任务逻辑是:source(kafka)->keyBy->timeWindow->reduce的count计数->redis >> source(kafka)->sink(s3 文件) >> >> >>问题是任务每天都会应该container被杀,导致任务重启 >>Container [pid=26148,containerID=container_e02_1622516404559_0038_01_08] >>is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB >>physical memory used; 9.8 GB of 40 GB virtual memory used. Killing container >> >> >>我的理解是缓存数据应该不会那么多,怎么就能达到物理内存限制呢,我的window操作,理应都是key下对应一个值,key的数据也不多,缓存应该也只记录这个状态,而且window采用的是reduce操作,来一条处理一条,增量处理,而不是processFunction的攒一批处理一次 >>望各位大佬帮忙看看,感谢 >> >> >>
关于任务运行一定时间后,physical内存超出,container被kill,导致任务重启
flink环境1.11.0 任务部署方式yarn per-job 状态后台设置的是:env.setStateBackend(new FsStateBackend("ckPath")) 每个taskManager分配8g内存,2个slot 每10分钟做一次checkpoint,每次ck大小平均400k 任务逻辑是:source(kafka)->keyBy->timeWindow->reduce的count计数->redis source(kafka)->sink(s3 文件) 问题是任务每天都会应该container被杀,导致任务重启 Container [pid=26148,containerID=container_e02_1622516404559_0038_01_08] is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB physical memory used; 9.8 GB of 40 GB virtual memory used. Killing container 我的理解是缓存数据应该不会那么多,怎么就能达到物理内存限制呢,我的window操作,理应都是key下对应一个值,key的数据也不多,缓存应该也只记录这个状态,而且window采用的是reduce操作,来一条处理一条,增量处理,而不是processFunction的攒一批处理一次 望各位大佬帮忙看看,感谢
(无主题)
各位大佬,在做远程提交任务到flink的standalone模式,抛以下异常 Caused by: java.lang.RuntimeException: java.lang.InterruptedException at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:277) at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:981) at cn.encdata.cloud.dataengine.core.RemoteEnvironment.execute(RemoteEnvironment.java:145) at cn.encdata.cloud.dataengine.core.builder.BatchJobBuilder.execute(BatchJobBuilder.java:89) ... 25 common frames omitted Caused by: java.lang.InterruptedException: null at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:976)