flink sql 的实现可以参考下 flink-table planner&runtime 部分的代码
从 datastream 层面来说, 可以基于 KeyedProcessFunction, 比如
datastream.keyby(...).process(keyedProcessFunction)... 来实现自己定义的逻辑

Best,
Lincoln Lee


hdxg1101300...@163.com <hdxg1101300...@163.com> 于2022年6月1日周三 15:49写道:

> 您好:
>    最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
>    比如这样一条sql语句:
>  select
>     dim,
>     count(*) as pv,
>     sum(price) as sum_price,
>     max(price) as max_price,
>     min(price) as min_price,
>     -- 计算 uv 数
>     count(distinct user_id) as uv,
>     UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS
> STRING)) * 1000  as window_start
> from source_table
> group by
>     dim,
>     tumble(row_time, interval '1' minute);
> 在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
> 如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数
> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K,
> W> windowFunction)
>
> 是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
> 谢谢!
>
>
> hdxg1101300...@163.com
>

回复