Hi, Jingsong.

     最新的类型推导相对于之前版本的类型推导更加严格,对schema的非空限制校验也更加细致。
     在之前提到的例子中使用基本类型做UDF参数, 
表示跟UDF中参数相关的列必须非空,而在创建视图时,每个类型默认的非空限制为false,因此出现了之前描述的问题。







祝好。

Best Roc.








在 2021-06-29 11:02:55,"Jingsong Li" <jingsongl...@gmail.com> 写道:
>Hi,
>
>你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题
>
>Best,
>Jingsong
>
>On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal <flin...@126.com> wrote:
>
>>
>>
>> Hi, All.
>>
>>
>> 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:
>>
>>
>> 版本: 1.13.1
>> 运行模式: IDE-application
>> ---------------------------------------------------------------
>> about udf define...
>>
>>
>>     public static class UDFAggregateFunction extends
>> AggregateFunction<Double, AccumulatorBean> {
>>
>>
>>         //返回最终结果
>>         @Override
>>         public Double getValue(AccumulatorBean acc) {
>>             return acc.totalPrice / acc.totalNum;
>>         }
>>
>>
>>         //构建保存中间结果的对象
>>         @Override
>>         public AccumulatorBean createAccumulator() {
>>             return new AccumulatorBean();
>>         }
>>
>>
>>         //减去要撤回的值
>>         public void retract(AccumulatorBean acc, double price, long num) {
>>             acc.totalPrice -= price * num;
>>             acc.totalNum -= num;
>>         }
>>
>>
>>         //从每个分区把数据取出来然后合并
>>         public void merge(AccumulatorBean acc, Iterable<AccumulatorBean>
>> it) {
>>
>>
>>             Iterator<AccumulatorBean> iter = it.iterator();
>>             while (iter.hasNext()) {
>>                 AccumulatorBean a = iter.next();
>>                 this.accumulate(acc, a.totalPrice, a.totalNum);
>>             }
>>         }
>>
>>
>>         //重置内存中值时调用
>>         public void resetAccumulator(AccumulatorBean acc) {
>>             acc.totalNum = 0;
>>             acc.totalPrice = 0;
>>         }
>>
>>
>>         //和传入数据进行计算的逻辑
>>         public void accumulate(AccumulatorBean acc, double price, long
>> num) {
>>             acc.totalPrice += price * num;
>>             acc.totalNum += num;
>>         }
>>     }
>>
>>
>>
>> ------------------------------------------------------------------------------------------------------------
>> About main calling....
>> //TODO 流批一体的 Table API
>>         TableEnvironment tableEnvironment =
>> TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
>>         List<Row> dataList = new ArrayList<>();
>>         dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
>>         dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
>>         dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
>>         Table table = tableEnvironment.fromValues(DataTypes.ROW(
>>             DataTypes.FIELD("user", DataTypes.STRING()),
>>             DataTypes.FIELD("name", DataTypes.STRING()),
>>             DataTypes.FIELD("price", DataTypes.DOUBLE()),
>>             DataTypes.FIELD("num", DataTypes.BIGINT())
>>             ),
>>             dataList);
>>         tableEnvironment.createTemporaryView("orders", table);
>>
>>
>>         tableEnvironment.createTemporaryFunction("c_agg", new
>> UDFAggregateFunction());
>>
>>
>>         tableEnvironment.executeSql("select user, c_agg(price, num) as
>> udf_field from orders group by user").print();
>>
>>
>>
>>
>>
>>
>>
>> 异常堆栈---------------------------------------------------------------------------------
>>
>>
>>
>>
>> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
>> at
>> com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
>> Caused by: org.apache.flink.table.api.ValidationException: Invalid
>> function call:
>> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
>> at
>> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
>> at
>> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
>> at
>> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
>> at
>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
>> at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:420)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4060)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3346)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:996)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:974)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:951)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>> ... 5 more
>> Caused by: org.apache.flink.table.api.ValidationException: Invalid input
>> arguments. Expected signatures are:
>> default_catalog.default_database.c_agg(price => DOUBLE NOT NULL, num =>
>> BIGINT NOT NULL)
>> at
>> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:181)
>> at
>> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:124)
>> at
>> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:86)
>> ... 31 more
>> Caused by: org.apache.flink.table.api.ValidationException: Invalid
>> argument type at position 0. Data type DOUBLE NOT NULL expected but DOUBLE
>> passed.
>> at
>> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
>> at
>> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:101)
>> at
>> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:122)
>> ... 32 more
>>
>>
>>
>>
>> 我单步调试跟踪了一下信息,发现这个类型和方法签名是可以对的上的。
>> 备注: 使用 tableEnvironment.registerFunction("c_agg", new
>> UDFAggregateFunction()); 就没有问题。可以正常运行。
>>
>>
>> 谢谢。
>>
>>
>>
>>
>>
>>
>>
>
>
>
>-- 
>Best, Jingsong Lee

回复