Dear:
您可以首先建一个这样的对象
class Acc{
   long sum;
   long max;
   long min;
   ...
}


在 AggregateFunction 里面维护这样的 ACC ,
就可以在 add 方法里面对维护的 acc 和新传入的值之间实现多需求下的结果更新。


不知道你想了解的是不是这个意思
 
 
 
------------------ Original ------------------
From: &nbsp;"Lincoln&nbsp;Lee"<lincoln.8...@gmail.com&gt;;
Date: &nbsp;Wed, Jun 1, 2022 04:53 PM
To: &nbsp;"user-zh"<user-zh@flink.apache.org&gt;; 

Subject: &nbsp;Re: 关于flinksql聚合函数实现的学习疑问

&nbsp;

flink sql 的实现可以参考下 flink-table planner&amp;runtime 部分的代码
从 datastream 层面来说, 可以基于 KeyedProcessFunction, 比如
datastream.keyby(...).process(keyedProcessFunction)... 来实现自己定义的逻辑

Best,
Lincoln Lee


hdxg1101300...@163.com <hdxg1101300...@163.com&gt; 于2022年6月1日周三 15:49写道:

&gt; 您好:
&gt;&nbsp;&nbsp;&nbsp; 最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
&gt;&nbsp;&nbsp;&nbsp; 比如这样一条sql语句:
&gt;&nbsp; select
&gt;&nbsp;&nbsp;&nbsp;&nbsp; dim,
&gt;&nbsp;&nbsp;&nbsp;&nbsp; count(*) as pv,
&gt;&nbsp;&nbsp;&nbsp;&nbsp; sum(price) as sum_price,
&gt;&nbsp;&nbsp;&nbsp;&nbsp; max(price) as max_price,
&gt;&nbsp;&nbsp;&nbsp;&nbsp; min(price) as min_price,
&gt;&nbsp;&nbsp;&nbsp;&nbsp; -- 计算 uv 数
&gt;&nbsp;&nbsp;&nbsp;&nbsp; count(distinct user_id) as uv,
&gt;&nbsp;&nbsp;&nbsp;&nbsp; UNIX_TIMESTAMP(CAST(tumble_start(row_time, 
interval '1' minute) AS
&gt; STRING)) * 1000&nbsp; as window_start
&gt; from source_table
&gt; group by
&gt;&nbsp;&nbsp;&nbsp;&nbsp; dim,
&gt;&nbsp;&nbsp;&nbsp;&nbsp; tumble(row_time, interval '1' minute);
&gt; 在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
&gt; 如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数
&gt; aggregate(AggregateFunction<T, ACC, V&gt; aggFunction, WindowFunction<V, 
R, K,
&gt; W&gt; windowFunction)
&gt;
&gt; 
是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
&gt; 谢谢!
&gt;
&gt;
&gt; hdxg1101300...@163.com
&gt;

回复