Re:关于flinksql聚合函数实现的学习疑问
Hi, 代码的话可以参考[1],由于agg的相关代码走的是codegen,推荐通过debug相关的测试类到附近。然后观察生成的代码。[1] https://github.com/apache/flink/blob/95e378e6565eea9b6b83702645e99733c33a957a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregateBase.scala#L171 在 2022-06-01 15:41:05,"hdxg1101300...@163.com" 写道: >您好: > 最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4) > 比如这样一条sql语句: > select >dim, >count(*) as pv, >sum(price) as sum_price, >max(price) as max_price, >min(price) as min_price, >-- 计算 uv 数 >count(distinct user_id) as uv, >UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS > STRING)) * 1000 as window_start >from source_table >group by >dim, >tumble(row_time, interval '1' minute); >在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数; >如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数 >aggregate(AggregateFunction aggFunction, WindowFunction >windowFunction) >是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。 >谢谢! > > >hdxg1101300...@163.com
Re: 关于flinksql聚合函数实现的学习疑问
Dear: 您可以首先建一个这样的对象 class Acc{ long sum; long max; long min; ... } 在 AggregateFunction 里面维护这样的 ACC , 就可以在 add 方法里面对维护的 acc 和新传入的值之间实现多需求下的结果更新。 不知道你想了解的是不是这个意思 --Original-- From: "LincolnLee"
Re: 关于flinksql聚合函数实现的学习疑问
flink sql 的实现可以参考下 flink-table planner 部分的代码 从 datastream 层面来说, 可以基于 KeyedProcessFunction, 比如 datastream.keyby(...).process(keyedProcessFunction)... 来实现自己定义的逻辑 Best, Lincoln Lee hdxg1101300...@163.com 于2022年6月1日周三 15:49写道: > 您好: >最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4) >比如这样一条sql语句: > select > dim, > count(*) as pv, > sum(price) as sum_price, > max(price) as max_price, > min(price) as min_price, > -- 计算 uv 数 > count(distinct user_id) as uv, > UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS > STRING)) * 1000 as window_start > from source_table > group by > dim, > tumble(row_time, interval '1' minute); > 在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数; > 如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数 > aggregate(AggregateFunction aggFunction, WindowFunction W> windowFunction) > > 是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。 > 谢谢! > > > hdxg1101300...@163.com >
关于flinksql聚合函数实现的学习疑问
您好: 最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4) 比如这样一条sql语句: select dim, count(*) as pv, sum(price) as sum_price, max(price) as max_price, min(price) as min_price, -- 计算 uv 数 count(distinct user_id) as uv, UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start from source_table group by dim, tumble(row_time, interval '1' minute); 在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数; 如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数 aggregate(AggregateFunction aggFunction, WindowFunction windowFunction) 是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。 谢谢! hdxg1101300...@163.com