Hi, 看了你的 UDAF 的实现,理论上是不可能比内置 count/sum 快的。可能是哪里有 bug 或是测试方式不对。 我想先问几个问题: 1. 是基于哪个版本,哪个 planner 进行的测试? 2. 流计算模式还是批计算模式? 3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?
Best, Jark On Tue, 28 Apr 2020 at 10:46, forideal <fszw...@163.com> wrote: > 大家好: > > > 我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG > 等等。 > 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误) > > > SQL: > > > select > query_nor, > sum(cast (1asbigint))as query_nor_counter > from ods_search_track > groupby > query_nor, > HOP( > event_time,interval'30'SECOND,interval'30'MINUTE) > sum: > public class Sum extends AggregateFunction<Long, AtomicLong> { > > @Override > public boolean isDeterministic() { > return false; > } > > @Override > public AtomicLong createAccumulator() { > return new AtomicLong(); > } > > @Override > public void open(FunctionContext context) throws Exception { > > } > > @Override > public Long getValue(AtomicLong acc) { > return acc.get(); > } > > @Override > public TypeInformation getResultType() { > return Types.LONG; > } > > public void merge(AtomicLong acc, Iterable<AtomicLong> it) { > Iterator<AtomicLong> iter = it.iterator(); > while (iter.hasNext()) { > AtomicLong a = iter.next(); > acc.addAndGet(a.get()); > } > } > > public void accumulate(AtomicLong datas, Long data) { > datas.addAndGet(data); > } > } > > > 使用 Flink buildin COUNT > > > select > query_nor, > count(1) as query_nor_counter > from ods_search_track > groupby > query_nor, > HOP( > event_time,interval'30'SECOND,interval'30'MINUTE)