Hi, All.
请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家: 版本: 1.13.1 运行模式: IDE-application --------------------------------------------------------------- about udf define... public static class UDFAggregateFunction extends AggregateFunction<Double, AccumulatorBean> { //返回最终结果 @Override public Double getValue(AccumulatorBean acc) { return acc.totalPrice / acc.totalNum; } //构建保存中间结果的对象 @Override public AccumulatorBean createAccumulator() { return new AccumulatorBean(); } //减去要撤回的值 public void retract(AccumulatorBean acc, double price, long num) { acc.totalPrice -= price * num; acc.totalNum -= num; } //从每个分区把数据取出来然后合并 public void merge(AccumulatorBean acc, Iterable<AccumulatorBean> it) { Iterator<AccumulatorBean> iter = it.iterator(); while (iter.hasNext()) { AccumulatorBean a = iter.next(); this.accumulate(acc, a.totalPrice, a.totalNum); } } //重置内存中值时调用 public void resetAccumulator(AccumulatorBean acc) { acc.totalNum = 0; acc.totalPrice = 0; } //和传入数据进行计算的逻辑 public void accumulate(AccumulatorBean acc, double price, long num) { acc.totalPrice += price * num; acc.totalNum += num; } } ------------------------------------------------------------------------------------------------------------ About main calling.... //TODO 流批一体的 Table API TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); List<Row> dataList = new ArrayList<>(); dataList.add(Row.of("张三", "可乐", 20.0D, 4L)); dataList.add(Row.of("张三", "果汁", 10.0D, 4L)); dataList.add(Row.of("李四", "咖啡", 10.0D, 2L)); Table table = tableEnvironment.fromValues(DataTypes.ROW( DataTypes.FIELD("user", DataTypes.STRING()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("price", DataTypes.DOUBLE()), DataTypes.FIELD("num", DataTypes.BIGINT()) ), dataList); tableEnvironment.createTemporaryView("orders", table); tableEnvironment.createTemporaryFunction("c_agg", new UDFAggregateFunction()); tableEnvironment.executeSql("select user, c_agg(price, num) as udf_field from orders group by user").print(); 异常堆栈--------------------------------------------------------------------------------- default_catalog.default_database.c_agg(DOUBLE, BIGINT) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) at com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139) Caused by: org.apache.flink.table.api.ValidationException: Invalid function call: default_catalog.default_database.c_agg(DOUBLE, BIGINT) at org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89) at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679) at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:420) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4060) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3346) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:996) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:974) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:951) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) ... 5 more Caused by: org.apache.flink.table.api.ValidationException: Invalid input arguments. Expected signatures are: default_catalog.default_database.c_agg(price => DOUBLE NOT NULL, num => BIGINT NOT NULL) at org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:181) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:124) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:86) ... 31 more Caused by: org.apache.flink.table.api.ValidationException: Invalid argument type at position 0. Data type DOUBLE NOT NULL expected but DOUBLE passed. at org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137) at org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:101) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:122) ... 32 more 我单步调试跟踪了一下信息,发现这个类型和方法签名是可以对的上的。 备注: 使用 tableEnvironment.registerFunction("c_agg", new UDFAggregateFunction()); 就没有问题。可以正常运行。 谢谢。