Re:Flink写入CK数据丢失问题

2022-06-01 文章 lxk






补充一下图片


https://s2.loli.net/2022/06/02/C5it4rPFgmJlopZ.png
https://s2.loli.net/2022/06/02/3ri2HIv1RsAawBW.png
https://s2.loli.net/2022/06/02/efVWPvXCYFwhgTp.png
https://s2.loli.net/2022/06/02/9UptbNaWvs7xwXC.png







在 2022-06-02 11:38:24,"lxk"  写道:

各位,请教个问题
目前使用flink往ck写入数据,使用的是datastream 
api以及rocksdb状态后端,程序中了开了两个窗口,都是10秒级别。同时还使用了sql进行group by 
求和,求和的操作没有加窗口,同时streamtableenv 设置了状态生存时间为10s.
在跟离线端对比数据的时候发现,一段时间内的数据跟离线差异不大,从0点-17点(数据的事件时间),但是18点(事件时间)以后的数据实时端差异量特别大。
目前在webui上发现整个管理内存的使用率已经占满,不知道是否跟这个有关系。
还有一点现象是,今天的数据我们对比了ck上实时的表(正确的),总体数据量还是要小很多。但是当我从零点重新消费,目前来看今天的数据能够对上,不知道是否是因为程序运行一段时间后,整个管理内存都被占满了,从而导致原本缓存的数据丢失了。
以下是相应的算子链和整个tm内存情况。出现反压是因为从今天0点重新开始消费了。




 

Flink写入CK数据丢失问题

2022-06-01 文章 lxk
各位,请教个问题
目前使用flink往ck写入数据,使用的是datastream 
api以及rocksdb状态后端,程序中了开了两个窗口,都是10秒级别。同时还使用了sql进行group by 
求和,求和的操作没有加窗口,同时streamtableenv 设置了状态生存时间为10s.
在跟离线端对比数据的时候发现,一段时间内的数据跟离线差异不大,从0点-17点(数据的事件时间),但是18点(事件时间)以后的数据实时端差异量特别大。
目前在webui上发现整个管理内存的使用率已经占满,不知道是否跟这个有关系。
还有一点现象是,今天的数据我们对比了ck上实时的表(正确的),总体数据量还是要小很多。但是当我从零点重新消费,目前来看今天的数据能够对上,不知道是否是因为程序运行一段时间后,整个管理内存都被占满了,从而导致原本缓存的数据丢失了。
以下是相应的算子链和整个tm内存情况。出现反压是因为从今天0点重新开始消费了。

Re: 实现SupportsFilterPushDown接口过程中遇到的问题

2022-06-01 文章 Shengkai Fang
Hi.

我记得 Jdbc Connector 实现了 ProjectionPushDown。你可以参考着实现。

xuyang 老师说的对,getScanRuntimeProvider 发生在 push down
之后。应该不会有你说的问题。另外,可以考虑贡献到社区[1],我们也可以帮忙一起 review 下,帮忙解决你的问题?

Best,
Shengkai

[1] https://issues.apache.org/jira/browse/FLINK-19651

Xuyang  于2022年6月1日周三 23:47写道:

>
> Hi,在SQL优化的末尾,StreamPhysicalTableSourceScan转化为StreamExecTableSourceScan的过程中[1],会执行一次getScanRuntimeProvider,此时的getScanRuntimeProvider一定是在applyFilters方法之后触发的。
>
>
>
>
> 你可以尝试将filterFields记录在JdbcDynamicTableSource
> 这个类中,如果该值为空,则getScanRuntimeProvider
> 时无需拼接(在applyFilters执行之前一定是空的);当该值不为空的时候,在getScanRuntimeProvider
> 进行拼接(最后一次physical node转exec node时,一定执行过applyFilters方法)。
>
>
>
>
> [1]
> https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java#L83
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2022-06-01 20:03:58,"朱育锋"  写道:
>
> Hi,在SQL优化的末尾,StreamPhysicalTableSourceScan转化为StreamExecTableSourceScan的过程中[1],会执行一次getScanRuntimeProvider,此时的getScanRuntimeProvider一定是在applyFilters方法之后触发的。你可以尝试将filterFields记录在JdbcDynamicTableSource
> 这个类中,如果该值为空,则getScanRuntimeProvider
> 时无需拼接(在applyFilters执行之前一定是空的);当该值不为空的时候,在getScanRuntimeProvider
> 进行拼接(最后一次physical node转exec node时,一定执行过applyFilters方法)。[1]
> https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java#L83


Re:实现SupportsFilterPushDown接口过程中遇到的问题

2022-06-01 文章 Xuyang
Hi,在SQL优化的末尾,StreamPhysicalTableSourceScan转化为StreamExecTableSourceScan的过程中[1],会执行一次getScanRuntimeProvider,此时的getScanRuntimeProvider一定是在applyFilters方法之后触发的。




你可以尝试将filterFields记录在JdbcDynamicTableSource 这个类中,如果该值为空,则getScanRuntimeProvider 
时无需拼接(在applyFilters执行之前一定是空的);当该值不为空的时候,在getScanRuntimeProvider 
进行拼接(最后一次physical node转exec node时,一定执行过applyFilters方法)。




[1] 
https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java#L83




--

Best!
Xuyang





在 2022-06-01 20:03:58,"朱育锋"  写道:

Hi,在SQL优化的末尾,StreamPhysicalTableSourceScan转化为StreamExecTableSourceScan的过程中[1],会执行一次getScanRuntimeProvider,此时的getScanRuntimeProvider一定是在applyFilters方法之后触发的。你可以尝试将filterFields记录在JdbcDynamicTableSource
 这个类中,如果该值为空,则getScanRuntimeProvider 
时无需拼接(在applyFilters执行之前一定是空的);当该值不为空的时候,在getScanRuntimeProvider 
进行拼接(最后一次physical node转exec node时,一定执行过applyFilters方法)。[1] 
https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java#L83

Re:关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 Xuyang
Hi, 代码的话可以参考[1],由于agg的相关代码走的是codegen,推荐通过debug相关的测试类到附近。然后观察生成的代码。[1] 
https://github.com/apache/flink/blob/95e378e6565eea9b6b83702645e99733c33a957a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregateBase.scala#L171
在 2022-06-01 15:41:05,"hdxg1101300...@163.com"  写道:
>您好:
>   最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
>   比如这样一条sql语句:
> select 
>dim,
>count(*) as pv,
>sum(price) as sum_price,
>max(price) as max_price,
>min(price) as min_price,
>-- 计算 uv 数
>count(distinct user_id) as uv,
>UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS 
> STRING)) * 1000  as window_start
>from source_table
>group by
>dim,
>tumble(row_time, interval '1' minute);
>在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
>如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数 
>aggregate(AggregateFunction aggFunction, WindowFunction 
>windowFunction) 
>是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
>谢谢!
>
>
>hdxg1101300...@163.com


实现SupportsFilterPushDown接口过程中遇到的问题

2022-06-01 文章 朱育锋
Hello Everyone

1. 我的目的是:使Flink Jdbc Connector支持谓词下推

2. 实现过程/背景:
我修改了Flink Jdbc Connector的代码,使 JdbcDynamicTableSource 类实现了 
SupportsFilterPushDown 接口。
在SupportsFilterPushDown.applyFilters(List filters) 
方法中我解析了Expression并转换成了类似下面的过滤条件
https://gist.github.com/zhuyufeng0809/f7e715d5c083e9674190544b7c8ef70d 
。
然后想将过滤条件拼接到 JdbcDynamicTableSource. getScanRuntimeProvider 
方法中的SQL查询语句上,从而实现谓词下推。

3. 现在遇到的问题是:
Flink Jdbc 
Connector运行时先调用的是getScanRuntimeProvider方法,后调用的是applyFilters方法。所以调用applyFilters方法生成过滤条件的时候,getScanRuntimeProvider方法已经被调用过了,无法再拼接过滤条件
目前卡在这里,想请教各位大佬如何才能将过滤条件拼接上去
4. Flink版本:1.14.3

Re: 关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 陈铜玖
Dear:
您可以首先建一个这样的对象
class Acc{
 long sum;
 long max;
 long min;
 ...
}


在 AggregateFunction 里面维护这样的 ACC ,
就可以在 add 方法里面对维护的 acc 和新传入的值之间实现多需求下的结果更新。


不知道你想了解的是不是这个意思



--Original--
From: "LincolnLee"

Re: 关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 Lincoln Lee
flink sql 的实现可以参考下 flink-table planner 部分的代码
从 datastream 层面来说, 可以基于 KeyedProcessFunction, 比如
datastream.keyby(...).process(keyedProcessFunction)... 来实现自己定义的逻辑

Best,
Lincoln Lee


hdxg1101300...@163.com  于2022年6月1日周三 15:49写道:

> 您好:
>最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
>比如这样一条sql语句:
>  select
> dim,
> count(*) as pv,
> sum(price) as sum_price,
> max(price) as max_price,
> min(price) as min_price,
> -- 计算 uv 数
> count(distinct user_id) as uv,
> UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS
> STRING)) * 1000  as window_start
> from source_table
> group by
> dim,
> tumble(row_time, interval '1' minute);
> 在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
> 如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数
> aggregate(AggregateFunction aggFunction, WindowFunction W> windowFunction)
>
> 是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
> 谢谢!
>
>
> hdxg1101300...@163.com
>


关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 hdxg1101300...@163.com
您好:
   最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
   比如这样一条sql语句:
 select 
dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) 
* 1000  as window_start
from source_table
group by
dim,
tumble(row_time, interval '1' minute);
在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数 
aggregate(AggregateFunction aggFunction, WindowFunction 
windowFunction) 
是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
谢谢!


hdxg1101300...@163.com