Re:Flink写入CK数据丢失问题

2022-06-01 文章 lxk
补充一下图片 https://s2.loli.net/2022/06/02/C5it4rPFgmJlopZ.png https://s2.loli.net/2022/06/02/3ri2HIv1RsAawBW.png https://s2.loli.net/2022/06/02/efVWPvXCYFwhgTp.png https://s2.loli.net/2022/06/02/9UptbNaWvs7xwXC.png 在 2022-06-02 11:38:24,"lxk" 写道: 各位,请教个问题 目前使用flink往ck写入数据,使用的是datastrea

Flink写入CK数据丢失问题

2022-06-01 文章 lxk
各位,请教个问题 目前使用flink往ck写入数据,使用的是datastream api以及rocksdb状态后端,程序中了开了两个窗口,都是10秒级别。同时还使用了sql进行group by 求和,求和的操作没有加窗口,同时streamtableenv 设置了状态生存时间为10s. 在跟离线端对比数据的时候发现,一段时间内的数据跟离线差异不大,从0点-17点(数据的事件时间),但是18点(事件时间)以后的数据实时端差异量特别大。 目前在webui上发现整个管理内存的使用率已经占满,不知道是否跟这个有关系。 还有一点现象是,今天的数据我们对比了ck上实时的表(正确的),总体数据量还是要小

Re: 实现SupportsFilterPushDown接口过程中遇到的问题

2022-06-01 文章 Shengkai Fang
Hi. 我记得 Jdbc Connector 实现了 ProjectionPushDown。你可以参考着实现。 xuyang 老师说的对,getScanRuntimeProvider 发生在 push down 之后。应该不会有你说的问题。另外,可以考虑贡献到社区[1],我们也可以帮忙一起 review 下,帮忙解决你的问题? Best, Shengkai [1] https://issues.apache.org/jira/browse/FLINK-19651 Xuyang 于2022年6月1日周三 23:47写道: > > Hi,在SQL优化的末尾,StreamPhysic

Re:实现SupportsFilterPushDown接口过程中遇到的问题

2022-06-01 文章 Xuyang
Hi,在SQL优化的末尾,StreamPhysicalTableSourceScan转化为StreamExecTableSourceScan的过程中[1],会执行一次getScanRuntimeProvider,此时的getScanRuntimeProvider一定是在applyFilters方法之后触发的。 你可以尝试将filterFields记录在JdbcDynamicTableSource 这个类中,如果该值为空,则getScanRuntimeProvider 时无需拼接(在applyFilters执行之前一定是空的);当该值不为空的时候,在getScanRuntimePro

Re:关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 Xuyang
Hi, 代码的话可以参考[1],由于agg的相关代码走的是codegen,推荐通过debug相关的测试类到附近。然后观察生成的代码。[1] https://github.com/apache/flink/blob/95e378e6565eea9b6b83702645e99733c33a957a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregateBase.scal

实现SupportsFilterPushDown接口过程中遇到的问题

2022-06-01 文章 朱育锋
Hello Everyone 1. 我的目的是:使Flink Jdbc Connector支持谓词下推 2. 实现过程/背景: 我修改了Flink Jdbc Connector的代码,使 JdbcDynamicTableSource 类实现了 SupportsFilterPushDown 接口。 在SupportsFilterPushDown.applyFilters(List filters) 方法中我解析了Expression并转换成了类似下面的过滤条件 https://gist.github.com/zhuyufeng0809/f7e715d5c083e

Re: 关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 陈铜玖
Dear: 您可以首先建一个这样的对象 class Acc{    long sum;    long max;    long min;    ... } 在 AggregateFunction 里面维护这样的 ACC , 就可以在 add 方法里面对维护的 acc 和新传入的值之间实现多需求下的结果更新。 不知道你想了解的是不是这个意思       -- Original -- From:  "Lincoln Lee"

Re: 关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 Lincoln Lee
flink sql 的实现可以参考下 flink-table planner&runtime 部分的代码 从 datastream 层面来说, 可以基于 KeyedProcessFunction, 比如 datastream.keyby(...).process(keyedProcessFunction)... 来实现自己定义的逻辑 Best, Lincoln Lee hdxg1101300...@163.com 于2022年6月1日周三 15:49写道: > 您好: >最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4) >比如这样

关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 hdxg1101300...@163.com
您好: 最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4) 比如这样一条sql语句: select dim, count(*) as pv, sum(price) as sum_price, max(price) as max_price, min(price) as min_price, -- 计算 uv 数 count(distinct user_id) as uv, UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1