Hi,

看起来你并没有实现`retract` 方法,正常来讲,over window在处理过期数据的时候,会将过期的数据进行一次retract计算。
所以你需要正确的实现一下retract方法。

chen310 <22221...@163.com> 于2020年9月14日周一 上午10:01写道:

> flink版本 1.11.1
>
> 实现了一个UDAF聚集函数,将窗口内某些字段合并成一个字符串。代码如下:
>
> public class AggDistinctDetail extends AggregateFunction<String,
> AggDistinctDetail.Details> {
>     private static final Logger logger =
> LoggerFactory.getLogger(AggDistinctDetail.class);
>
>     public static class Details {
>         public Set<String> set;
>     }
>
>     @Override
>     public Details createAccumulator() {
>         return new Details();
>     }
>
>     @Override
>     public String getValue(Details acc) {
>         return JSON.toJSONString(acc.set);
>     }
>
>     public void accumulate(Details acc, String val) {
>         if (acc.set == null) {
>             acc.set = new HashSet<>();
>         }
>         acc.set.add(val);
>     }
>
>     public void retract(Details acc, String val) {
>         //now, agg detail don't need support retraction
>     }
>
>     public void merge(Details acc, Iterable<Details> it) {
>         Iterator<Details> iter = it.iterator();
>         if (acc.set == null) {
>             acc.set = new HashSet<>();
>         }
>         while (iter.hasNext()) {
>             Details a = iter.next();
>             acc.set.addAll(a.set);
>         }
>     }
>
>     public void resetAccumulator(Details acc) {
>         acc.set = null;
>     }
> }
>
> 将此UDAF使用在over窗口上,此窗口按realIp分区,以消息中事件时间(EventTime)
> requestDateTime向前推24小时作为窗口,统计窗口内realIp对应的所有userId,作为明细输出userId聚集后的字符串。
>
> drop function if exists UDF_InfoDistinctMerge;
> create function UDF_InfoDistinctMerge AS
> 'com.binance.risk.flink.udf.AggDistinctDetail';
>
> select
> realIp ,
> UDF_InfoDistinctMerge(userId) over w1 as userSet
> from source_table
> window w1 as (partition by realIp order by requestDateTime asc RANGE
> BETWEEN
> INTERVAL '24' hour preceding AND CURRENT ROW) ;
>
> 实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。
>
> 问题:
> 是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li

回复