Hi, 代码的话可以参考[1],由于agg的相关代码走的是codegen,推荐通过debug相关的测试类到附近。然后观察生成的代码。<br/><br/>[1] 
https://github.com/apache/flink/blob/95e378e6565eea9b6b83702645e99733c33a957a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregateBase.scala#L171
在 2022-06-01 15:41:05,"hdxg1101300...@163.com" <hdxg1101300...@163.com> 写道:
>您好:
>   最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
>   比如这样一条sql语句:
> select 
>    dim,
>    count(*) as pv,
>    sum(price) as sum_price,
>    max(price) as max_price,
>    min(price) as min_price,
>    -- 计算 uv 数
>    count(distinct user_id) as uv,
>    UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS 
> STRING)) * 1000  as window_start
>from source_table
>group by
>    dim,
>    tumble(row_time, interval '1' minute);
>在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
>如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数 
>aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> 
>windowFunction) 
>是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
>谢谢!
>
>
>hdxg1101300...@163.com

回复