Re: Flink Over Window功能存储状态优化能有性能提升吗?

2021-10-19 文章 Tianwang Li
flink1.13 和 flink 1.14

Caizhi Weng  于2021年10月20日周三 上午10:17写道:

> Hi!
>
> 会将 input 的记录存储在 state 里面。
>
> 如果 input 的字段比较多,但是参与聚合运算的字段比较少。
>
> 这样会导致 state 非常的大。
> >
>
> 你使用的是哪个 Flink 版本以及哪个 planner?这个观察是如何得出的呢?就我所知,state 里应该只存储了参与 agg 运算的字段。
>
> Tianwang Li  于2021年10月19日周二 下午8:34写道:
>
> > Flink 的 Over 窗口
> > 例如在 range over window 场合,会将 input 的记录存储在 state 里面。
> > 如果 input 的字段比较多,但是参与聚合运算的字段比较少。
> > 这样会导致 state 非常的大。
> >
> > 从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看,
> > 不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。
> >
> > 这样能提升Over 窗口的处理性能吗?
> >
> > SQL例子:
> >
> > SELECT
> > col_1,
> > col_2,
> > col_3,
> > col_4,
> > col_5,
> > col_6, -- 字段内容比较长
> > col_7, -- 字段内容比较长
> > col_8, -- 字段内容比较长
> > col_9, -- 字段内容比较长
> > col_10,
> > col_11,
> > col_12,
> > col_13,
> > col_14,
> > col_15,
> > col_16,
> > col_17,
> > col_18,
> > col_19,
> > sum(col_10) OVER w AS day_col_10,
> > sum(col_11) OVER w AS day_col_11,
> > sum(col_12) OVER w AS day_col_12,
> > sum(col_13) OVER w AS day_col_13,
> > sum(col_14) OVER w AS day_col_14,
> > sum(col_15) OVER w AS day_col_15,
> > sum(col_16) OVER w AS day_col_16
> > FROM table_3
> > window w as (
> > PARTITION BY col_1, col_2
> > ORDER BY rowtime
> > RANGE BETWEEN INTERVAL '1' DAY preceding AND CURRENT ROW)
> >
> > --
> > **
> >  tivanli
> > **
> >
>


-- 
**
 tivanli
**


Re: Flink Over Window功能存储状态优化能有性能提升吗?

2021-10-19 文章 Caizhi Weng
Hi!

会将 input 的记录存储在 state 里面。

如果 input 的字段比较多,但是参与聚合运算的字段比较少。

这样会导致 state 非常的大。
>

你使用的是哪个 Flink 版本以及哪个 planner?这个观察是如何得出的呢?就我所知,state 里应该只存储了参与 agg 运算的字段。

Tianwang Li  于2021年10月19日周二 下午8:34写道:

> Flink 的 Over 窗口
> 例如在 range over window 场合,会将 input 的记录存储在 state 里面。
> 如果 input 的字段比较多,但是参与聚合运算的字段比较少。
> 这样会导致 state 非常的大。
>
> 从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看,
> 不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。
>
> 这样能提升Over 窗口的处理性能吗?
>
> SQL例子:
>
> SELECT
> col_1,
> col_2,
> col_3,
> col_4,
> col_5,
> col_6, -- 字段内容比较长
> col_7, -- 字段内容比较长
> col_8, -- 字段内容比较长
> col_9, -- 字段内容比较长
> col_10,
> col_11,
> col_12,
> col_13,
> col_14,
> col_15,
> col_16,
> col_17,
> col_18,
> col_19,
> sum(col_10) OVER w AS day_col_10,
> sum(col_11) OVER w AS day_col_11,
> sum(col_12) OVER w AS day_col_12,
> sum(col_13) OVER w AS day_col_13,
> sum(col_14) OVER w AS day_col_14,
> sum(col_15) OVER w AS day_col_15,
> sum(col_16) OVER w AS day_col_16
> FROM table_3
> window w as (
> PARTITION BY col_1, col_2
> ORDER BY rowtime
> RANGE BETWEEN INTERVAL '1' DAY preceding AND CURRENT ROW)
>
> --
> **
>  tivanli
> **
>


回复:flink作业的停止

2021-10-19 文章 lei-tian
Hi , yuepeng-pan:
你好,我这边提交的是数据yarn的per-job的模式,Flink的UI界面上在任务running状态下可以看到jobmanager的日志和taskmanager的日志,任务finished或者failed后它会出现在UI界面上的Completed
 Job List,同时左边的tm的点击去后已经没有相关信息,只有jm有相关信息,
应该是JM资源没有被释放。


| |
lei-tian
|
|
totorobabyf...@163.com
|
签名由网易邮箱大师定制
在2021年10月19日 10:53,Yuepeng Pan 写道:
Hi,
lei-tian.
基于你的描述,我推测(flink-1.10+)会存在这几种可能。
1. 使用了 
flink的yarn-session模式,然后进行了相应的作业提交。这种情况表现为,作业最终完成后,yarn中对应flink集群的taskmanager 
container能够被释放掉,但是只是保留了Jobmanager组件的容器没有释放。在 flink的yarn-session模式 的部署方式中,这是正常的。
2. 不论是在flink yarn 的per-job部署模式或者yarn-session部署模式中,如果负责hbase source/sink的 
算子与其他任意一种流模式的算子进行connect或者union等多流的计算,那么将会导致hbase 
IO结束后,剩余的流式算子还是处于正常运行状态,这种情况下的大概表现为 yarn中 flink taskmanager 
container和jobmanager container 都未释放。
3.其他。
如果作业所有的souce都是读取"批模式"的数据源,比如 mysql/hbase 而非包含kafka/pulsar等,那么你可以尝试flink on yarn 
的per-job的部署方式运行任务。




祝好。
Roc











在 2021-10-18 21:31:21,"lei-tian"  写道:
您好:
我用flink读取hbase或者文件的数据,读完数据之后页面显示状态为finished,但是这个任务在yarn上的资源还没有被释放,仍然在yarn的running列表里面,需要通过命令行kill掉这个任务,有什么解决的方法么。我没有设置batch或者streaming的模式,用的是默认的。


| |
lei-tian
|
|
totorobabyf...@163.com
|
签名由网易邮箱大师定制


Flink Over Window功能存储状态优化能有性能提升吗?

2021-10-19 文章 Tianwang Li
Flink 的 Over 窗口
例如在 range over window 场合,会将 input 的记录存储在 state 里面。
如果 input 的字段比较多,但是参与聚合运算的字段比较少。
这样会导致 state 非常的大。

从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看,
不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。

这样能提升Over 窗口的处理性能吗?

SQL例子:

SELECT
col_1,
col_2,
col_3,
col_4,
col_5,
col_6, -- 字段内容比较长
col_7, -- 字段内容比较长
col_8, -- 字段内容比较长
col_9, -- 字段内容比较长
col_10,
col_11,
col_12,
col_13,
col_14,
col_15,
col_16,
col_17,
col_18,
col_19,
sum(col_10) OVER w AS day_col_10,
sum(col_11) OVER w AS day_col_11,
sum(col_12) OVER w AS day_col_12,
sum(col_13) OVER w AS day_col_13,
sum(col_14) OVER w AS day_col_14,
sum(col_15) OVER w AS day_col_15,
sum(col_16) OVER w AS day_col_16
FROM table_3
window w as (
PARTITION BY col_1, col_2
ORDER BY rowtime
RANGE BETWEEN INTERVAL '1' DAY preceding AND CURRENT ROW)

-- 
**
 tivanli
**


?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh

Exception in thread Thread-14:
Traceback (most recent call last):
 File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in 
_bootstrap_inner
  self.run()
 File 
"D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py",
 line 218, in run
  while not self._finished.wait(next_call - time.time()):
 File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait
  signaled = self._cond.wait(timeout)
 File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait
  gotit = waiter.acquire(True, timeout)
OverflowError: timeout value is too large

?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh
----
??: 
   "user-zh"



?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh
??udfudfjar??














--  --
??: 
   "user-zh"



?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh
??udfudfjar??













----
??: 
   "user-zh"



Re: flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-19 文章 Lei Wang
确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。
程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G


On Mon, Oct 18, 2021 at 10:44 AM Michael Ran  wrote:

> 应该和OSS没关系吧,毕竟只是个存储。
> 我们CPU 你先看看消耗在哪个线程或者方法类呗
>
>
>
> 在 2021-10-08 16:34:47,"Lei Wang"  写道:
>
>
>
> flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
> 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。
>
>
>
>
>
>
> 这个可能的原因是什么?会跟 OSS 有关吗?
>
>
> 谢谢,
> 王磊