Hi, All.


请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:


版本: 1.13.1
运行模式: IDE-application
---------------------------------------------------------------
about udf define...


    public static class UDFAggregateFunction extends AggregateFunction<Double, 
AccumulatorBean> {


        //返回最终结果
        @Override
        public Double getValue(AccumulatorBean acc) {
            return acc.totalPrice / acc.totalNum;
        }


        //构建保存中间结果的对象
        @Override
        public AccumulatorBean createAccumulator() {
            return new AccumulatorBean();
        }


        //减去要撤回的值
        public void retract(AccumulatorBean acc, double price, long num) {
            acc.totalPrice -= price * num;
            acc.totalNum -= num;
        }


        //从每个分区把数据取出来然后合并
        public void merge(AccumulatorBean acc, Iterable<AccumulatorBean> it) {


            Iterator<AccumulatorBean> iter = it.iterator();
            while (iter.hasNext()) {
                AccumulatorBean a = iter.next();
                this.accumulate(acc, a.totalPrice, a.totalNum);
            }
        }


        //重置内存中值时调用
        public void resetAccumulator(AccumulatorBean acc) {
            acc.totalNum = 0;
            acc.totalPrice = 0;
        }


        //和传入数据进行计算的逻辑
        public void accumulate(AccumulatorBean acc, double price, long num) {
            acc.totalPrice += price * num;
            acc.totalNum += num;
        }
    }


------------------------------------------------------------------------------------------------------------
About main calling....
//TODO 流批一体的 Table API
        TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
        List<Row> dataList = new ArrayList<>();
        dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
        dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
        dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
        Table table = tableEnvironment.fromValues(DataTypes.ROW(
            DataTypes.FIELD("user", DataTypes.STRING()),
            DataTypes.FIELD("name", DataTypes.STRING()),
            DataTypes.FIELD("price", DataTypes.DOUBLE()),
            DataTypes.FIELD("num", DataTypes.BIGINT())
            ),
            dataList);
        tableEnvironment.createTemporaryView("orders", table);


        tableEnvironment.createTemporaryFunction("c_agg", new 
UDFAggregateFunction());


        tableEnvironment.executeSql("select user, c_agg(price, num) as 
udf_field from orders group by user").print();






异常堆栈---------------------------------------------------------------------------------




default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
call:
default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:420)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4060)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3346)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:996)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:974)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:951)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
... 5 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid input 
arguments. Expected signatures are:
default_catalog.default_database.c_agg(price => DOUBLE NOT NULL, num => BIGINT 
NOT NULL)
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:181)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:124)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:86)
... 31 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid argument 
type at position 0. Data type DOUBLE NOT NULL expected but DOUBLE passed.
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:101)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:122)
... 32 more




我单步调试跟踪了一下信息,发现这个类型和方法签名是可以对的上的。
备注: 使用 tableEnvironment.registerFunction("c_agg", new UDFAggregateFunction()); 
就没有问题。可以正常运行。


谢谢。






 

回复