*flink1.11*
在TableEnvironment环境中注册并使用自定义的Aggregate
Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)

org.apache.flink.table.api.TableException: Aggregate functions are not
updated to the new type system yet.
        at
org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
        at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
        at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
        at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
        at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
        at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
        at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
        at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
        at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
        at
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
        at java.util.function.Function.lambda$andThen$1(Function.java:88)
        at
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
        at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
        at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
        at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
        at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
        at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
        at
org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
        at
org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
        at
com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
        at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)

*// 以下是代码*
// main
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();

TableEnvironment tEnv = TableEnvironment.create(envSettings);

// 注册source table, jdbc table source
tEnv.executeSql("CREATE TABLE wx_event_log (....) with
('connect.type'='jdbc'),....");

// 注册sink table,csv table sink
tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with
('connect.type'='filesystem','format.type'='csv',.....)");

// 注册agg function
tEnv.createTemporarySystemFunction("firSendMsgFunc",new FirstSendMsgFunc());

Table table2 = tEnv.sqlQuery("select from_user,create_time from wx_event_log
where msg_type='text' and create_time between '2020-03-20' and
'2020-03-21'");

table2.groupBy($("from_user"))
       
.aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
        .select($("from_user"),$("first_send_msg_today"))
        .executeInsert("wx_data_statistics");


// 自定义agg function类
public class FirstSendMsgFunc extends
AggregateFunction<LocalDateTime,CountDTO> {

    public void accumulate(CountDTO acc, LocalDateTime createTime) {
        if (acc.getDateTime() == null) {
            acc.setDateTime(createTime);
        } else if (acc.getDateTime().isAfter(createTime)) {
            acc.setDateTime(createTime);
        }
    }

    @Override
    public LocalDateTime getValue(CountDTO acc) {
        return acc.getDateTime();
    }

    @Override
    public CountDTO createAccumulator() {
        return new CountDTO();
    }
}

// accumulate pojo 类
public class CountDTO implements Serializable {

    private Integer count;

    private LocalDateTime dateTime;

    public Integer getCount() {
        return count;
    }

    public void setCount(Integer count) {
        this.count = count;
    }

    public LocalDateTime getDateTime() {
        return dateTime;
    }

    public void setDateTime(LocalDateTime dateTime) {
        this.dateTime = dateTime;
    }
}





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复