*flink1.11* 在TableEnvironment环境中注册并使用自定义的Aggregate Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)
org.apache.flink.table.api.TableException: Aggregate functions are not updated to the new type system yet. at org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84) at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211) at java.util.function.Function.lambda$andThen$1(Function.java:88) at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511) at org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685) at com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48) at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36) *// 以下是代码* // main EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment tEnv = TableEnvironment.create(envSettings); // 注册source table, jdbc table source tEnv.executeSql("CREATE TABLE wx_event_log (....) with ('connect.type'='jdbc'),...."); // 注册sink table,csv table sink tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with ('connect.type'='filesystem','format.type'='csv',.....)"); // 注册agg function tEnv.createTemporarySystemFunction("firSendMsgFunc",new FirstSendMsgFunc()); Table table2 = tEnv.sqlQuery("select from_user,create_time from wx_event_log where msg_type='text' and create_time between '2020-03-20' and '2020-03-21'"); table2.groupBy($("from_user")) .aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today")) .select($("from_user"),$("first_send_msg_today")) .executeInsert("wx_data_statistics"); // 自定义agg function类 public class FirstSendMsgFunc extends AggregateFunction<LocalDateTime,CountDTO> { public void accumulate(CountDTO acc, LocalDateTime createTime) { if (acc.getDateTime() == null) { acc.setDateTime(createTime); } else if (acc.getDateTime().isAfter(createTime)) { acc.setDateTime(createTime); } } @Override public LocalDateTime getValue(CountDTO acc) { return acc.getDateTime(); } @Override public CountDTO createAccumulator() { return new CountDTO(); } } // accumulate pojo 类 public class CountDTO implements Serializable { private Integer count; private LocalDateTime dateTime; public Integer getCount() { return count; } public void setCount(Integer count) { this.count = count; } public LocalDateTime getDateTime() { return dateTime; } public void setDateTime(LocalDateTime dateTime) { this.dateTime = dateTime; } } -- Sent from: http://apache-flink.147419.n8.nabble.com/