Re: Flink Over Window功能存储状态优化能有性能提升吗?
flink1.13 和 flink 1.14 Caizhi Weng 于2021年10月20日周三 上午10:17写道: > Hi! > > 会将 input 的记录存储在 state 里面。 > > 如果 input 的字段比较多,但是参与聚合运算的字段比较少。 > > 这样会导致 state 非常的大。 > > > > 你使用的是哪个 Flink 版本以及哪个 planner?这个观察是如何得出的呢?就我所知,state 里应该只存储了参与 agg 运算的字段。 > > Tianwang Li 于2021年10月19日周二 下午8:34写道: > > > Flink 的 Over 窗口 > > 例如在 range over window 场合,会将 input 的记录存储在 state 里面。 > > 如果 input 的字段比较多,但是参与聚合运算的字段比较少。 > > 这样会导致 state 非常的大。 > > > > 从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看, > > 不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。 > > > > 这样能提升Over 窗口的处理性能吗? > > > > SQL例子: > > > > SELECT > > col_1, > > col_2, > > col_3, > > col_4, > > col_5, > > col_6, -- 字段内容比较长 > > col_7, -- 字段内容比较长 > > col_8, -- 字段内容比较长 > > col_9, -- 字段内容比较长 > > col_10, > > col_11, > > col_12, > > col_13, > > col_14, > > col_15, > > col_16, > > col_17, > > col_18, > > col_19, > > sum(col_10) OVER w AS day_col_10, > > sum(col_11) OVER w AS day_col_11, > > sum(col_12) OVER w AS day_col_12, > > sum(col_13) OVER w AS day_col_13, > > sum(col_14) OVER w AS day_col_14, > > sum(col_15) OVER w AS day_col_15, > > sum(col_16) OVER w AS day_col_16 > > FROM table_3 > > window w as ( > > PARTITION BY col_1, col_2 > > ORDER BY rowtime > > RANGE BETWEEN INTERVAL '1' DAY preceding AND CURRENT ROW) > > > > -- > > ** > > tivanli > > ** > > > -- ** tivanli **
Re: Flink Over Window功能存储状态优化能有性能提升吗?
Hi! 会将 input 的记录存储在 state 里面。 如果 input 的字段比较多,但是参与聚合运算的字段比较少。 这样会导致 state 非常的大。 > 你使用的是哪个 Flink 版本以及哪个 planner?这个观察是如何得出的呢?就我所知,state 里应该只存储了参与 agg 运算的字段。 Tianwang Li 于2021年10月19日周二 下午8:34写道: > Flink 的 Over 窗口 > 例如在 range over window 场合,会将 input 的记录存储在 state 里面。 > 如果 input 的字段比较多,但是参与聚合运算的字段比较少。 > 这样会导致 state 非常的大。 > > 从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看, > 不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。 > > 这样能提升Over 窗口的处理性能吗? > > SQL例子: > > SELECT > col_1, > col_2, > col_3, > col_4, > col_5, > col_6, -- 字段内容比较长 > col_7, -- 字段内容比较长 > col_8, -- 字段内容比较长 > col_9, -- 字段内容比较长 > col_10, > col_11, > col_12, > col_13, > col_14, > col_15, > col_16, > col_17, > col_18, > col_19, > sum(col_10) OVER w AS day_col_10, > sum(col_11) OVER w AS day_col_11, > sum(col_12) OVER w AS day_col_12, > sum(col_13) OVER w AS day_col_13, > sum(col_14) OVER w AS day_col_14, > sum(col_15) OVER w AS day_col_15, > sum(col_16) OVER w AS day_col_16 > FROM table_3 > window w as ( > PARTITION BY col_1, col_2 > ORDER BY rowtime > RANGE BETWEEN INTERVAL '1' DAY preceding AND CURRENT ROW) > > -- > ** > tivanli > ** >
回复:flink作业的停止
Hi , yuepeng-pan: 你好,我这边提交的是数据yarn的per-job的模式,Flink的UI界面上在任务running状态下可以看到jobmanager的日志和taskmanager的日志,任务finished或者failed后它会出现在UI界面上的Completed Job List,同时左边的tm的点击去后已经没有相关信息,只有jm有相关信息, 应该是JM资源没有被释放。 | | lei-tian | | totorobabyf...@163.com | 签名由网易邮箱大师定制 在2021年10月19日 10:53,Yuepeng Pan 写道: Hi, lei-tian. 基于你的描述,我推测(flink-1.10+)会存在这几种可能。 1. 使用了 flink的yarn-session模式,然后进行了相应的作业提交。这种情况表现为,作业最终完成后,yarn中对应flink集群的taskmanager container能够被释放掉,但是只是保留了Jobmanager组件的容器没有释放。在 flink的yarn-session模式 的部署方式中,这是正常的。 2. 不论是在flink yarn 的per-job部署模式或者yarn-session部署模式中,如果负责hbase source/sink的 算子与其他任意一种流模式的算子进行connect或者union等多流的计算,那么将会导致hbase IO结束后,剩余的流式算子还是处于正常运行状态,这种情况下的大概表现为 yarn中 flink taskmanager container和jobmanager container 都未释放。 3.其他。 如果作业所有的souce都是读取"批模式"的数据源,比如 mysql/hbase 而非包含kafka/pulsar等,那么你可以尝试flink on yarn 的per-job的部署方式运行任务。 祝好。 Roc 在 2021-10-18 21:31:21,"lei-tian" 写道: 您好: 我用flink读取hbase或者文件的数据,读完数据之后页面显示状态为finished,但是这个任务在yarn上的资源还没有被释放,仍然在yarn的running列表里面,需要通过命令行kill掉这个任务,有什么解决的方法么。我没有设置batch或者streaming的模式,用的是默认的。 | | lei-tian | | totorobabyf...@163.com | 签名由网易邮箱大师定制
Flink Over Window功能存储状态优化能有性能提升吗?
Flink 的 Over 窗口 例如在 range over window 场合,会将 input 的记录存储在 state 里面。 如果 input 的字段比较多,但是参与聚合运算的字段比较少。 这样会导致 state 非常的大。 从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看, 不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。 这样能提升Over 窗口的处理性能吗? SQL例子: SELECT col_1, col_2, col_3, col_4, col_5, col_6, -- 字段内容比较长 col_7, -- 字段内容比较长 col_8, -- 字段内容比较长 col_9, -- 字段内容比较长 col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, sum(col_10) OVER w AS day_col_10, sum(col_11) OVER w AS day_col_11, sum(col_12) OVER w AS day_col_12, sum(col_13) OVER w AS day_col_13, sum(col_14) OVER w AS day_col_14, sum(col_15) OVER w AS day_col_15, sum(col_16) OVER w AS day_col_16 FROM table_3 window w as ( PARTITION BY col_1, col_2 ORDER BY rowtime RANGE BETWEEN INTERVAL '1' DAY preceding AND CURRENT ROW) -- ** tivanli **
?????? pyflink 1.14.0 udf ??????????????????????????
Exception in thread Thread-14: Traceback (most recent call last): File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in _bootstrap_inner self.run() File "D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py", line 218, in run while not self._finished.wait(next_call - time.time()): File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait signaled = self._cond.wait(timeout) File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait gotit = waiter.acquire(True, timeout) OverflowError: timeout value is too large
?????? pyflink 1.14.0 udf ??????????????????????????
---- ??: "user-zh"
?????? pyflink 1.14.0 udf ??????????????????????????
??udfudfjar?? -- -- ??: "user-zh"
?????? pyflink 1.14.0 udf ??????????????????????????
??udfudfjar?? ---- ??: "user-zh"
Re: flink 以阿里云 oss 作为 checkpoint cpu 过高
确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。 程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G On Mon, Oct 18, 2021 at 10:44 AM Michael Ran wrote: > 应该和OSS没关系吧,毕竟只是个存储。 > 我们CPU 你先看看消耗在哪个线程或者方法类呗 > > > > 在 2021-10-08 16:34:47,"Lei Wang" 写道: > > > > flink 程序以 RocksDB 作为 stateBackend, aliyun OSS 作为 checkpoint 数据最终的物理位置。 > 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。 > > > > > > > 这个可能的原因是什么?会跟 OSS 有关吗? > > > 谢谢, > 王磊