应该是继承scalaFunction ?
















在 2021-02-22 10:25:31,"xiaoyue" <18242988...@163.com> 写道:
>捞一下自己,在线等大佬们的回复 _(:з」∠)_
>
>
>
>
>
>
>
>在 2021-02-20 13:14:18,"xiaoyue" <18242988...@163.com> 写道:
>
>我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function 
>signature prod(<NUMERIC>),请求大佬帮忙看看_(:з」∠)_
>
>以下是代码:
>-----------------------------------------------------
>...
>  stableEnv.createTemporarySystemFunction("prod", 
> ProductAggregateFunction.class);
>  Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as 
> yldrate from queryData group by pf_id");
>...
>-----------------------------------------------------
>@FunctionHint(
>        input = @DataTypeHint("Double"),
>        output = @DataTypeHint("Double")
>)
>public class ProductAggregateFunction extends AggregateFunction<Double, 
>Product> {
>
>
>    @Override
>    public Double getValue(Product acc) {
>        return acc.prod;
>    }
>    @Override
>    public Product createAccumulator() {
>        return new Product();
>    }
>    public void accumulate(Product acc, Double iValue) {
>        acc.prod *= iValue;
>    }
>    public void retract(Product acc, Double iValue) {
>        acc.prod /= iValue;
>    }
>    public void merge(Product acc, Iterable<Product> it) {
>        for (Product p : it) {
>            accumulate(acc, p.prod);
>        }
>    }
>    public void resetAccumulator(Product acc) {
>        acc.prod = 1D;
>    }
>}
>
>
>
>
>
> 

回复