Dear: 您可以首先建一个这样的对象 class Acc{ long sum; long max; long min; ... }
在 AggregateFunction 里面维护这样的 ACC , 就可以在 add 方法里面对维护的 acc 和新传入的值之间实现多需求下的结果更新。 不知道你想了解的是不是这个意思 ------------------ Original ------------------ From: "Lincoln Lee"<lincoln.8...@gmail.com>; Date: Wed, Jun 1, 2022 04:53 PM To: "user-zh"<user-zh@flink.apache.org>; Subject: Re: 关于flinksql聚合函数实现的学习疑问 flink sql 的实现可以参考下 flink-table planner&runtime 部分的代码 从 datastream 层面来说, 可以基于 KeyedProcessFunction, 比如 datastream.keyby(...).process(keyedProcessFunction)... 来实现自己定义的逻辑 Best, Lincoln Lee hdxg1101300...@163.com <hdxg1101300...@163.com> 于2022年6月1日周三 15:49写道: > 您好: > 最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4) > 比如这样一条sql语句: > select > dim, > count(*) as pv, > sum(price) as sum_price, > max(price) as max_price, > min(price) as min_price, > -- 计算 uv 数 > count(distinct user_id) as uv, > UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS > STRING)) * 1000 as window_start > from source_table > group by > dim, > tumble(row_time, interval '1' minute); > 在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数; > 如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数 > aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, > W> windowFunction) > > 是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。 > 谢谢! > > > hdxg1101300...@163.com >