Hi,


你说的这个问题我们也遇到过,是在flink 
1.11以后在自定义UDF时需要添加注解进行类型推断,可以参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/#type-inference


祝早日解决问题
| |
Chuang Li
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制


在2021年12月21日 09:41,Caizhi Weng<tsreape...@gmail.com> 写道:
Hi!

这个自定义类型是作为 accumulator 还是被聚合的值?如果是 accumulator 则不应该报错,能否分享一下 udaf
的代码?如果是作为被聚合的值,目前自定义类型只支持 pojo,对 pojo 的要求详见 [1]。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos

陈卓宇 <2572805...@qq.com.invalid> 于2021年12月20日周一 17:00写道:

在自定义udaf函数实现中使用了一些flinksql不支持的数据类型
想请问如何进行自定义数据类型的实现







Exception in thread "main" org.apache.flink.table.api.ValidationException:
SQL validation failed. An error occurred in the type inference logic of
function 'Average'.
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
&nbsp;&nbsp; &nbsp;at com.analysys.avg2.main(avg2.java:70)
Caused by: org.apache.flink.table.api.ValidationException: An error
occurred in the type inference logic of function 'Average'.
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
&nbsp;&nbsp; &nbsp;at java.util.Optional.flatMap(Optional.java:241)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
&nbsp;&nbsp; &nbsp;at
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
&nbsp;&nbsp; &nbsp;at
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183)
&nbsp;&nbsp; &nbsp;at
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200)
&nbsp;&nbsp; &nbsp;at
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
&nbsp;&nbsp; &nbsp;at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945)
&nbsp;&nbsp; &nbsp;at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)
&nbsp;&nbsp; &nbsp;... 5 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a valid type inference for function class
'com.analysys.avg2$Average'. Please check for implementation mistakes
and/or provide a corresponding hint.
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
&nbsp;&nbsp; &nbsp;... 16 more
Caused by: org.apache.flink.table.api.ValidationException: Error in
extracting a signature to accumulator mapping.
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:135)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:168)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
&nbsp;&nbsp; &nbsp;... 19 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to
extract a type inference from method:
public void
com.analysys.avg2$Average.accumulate(com.analysys.avg2$SumCount,java.lang.Integer)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:124)
&nbsp;&nbsp; &nbsp;... 21 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a data type from 'class com.analysys.avg2$SumCount' in generic
class 'org.apache.flink.table.functions.AggregateFunction' in class
com.analysys.avg2$Average. Please pass the required data type manually or
allow RAW types.
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
&nbsp;&nbsp; &nbsp;... 22 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a data type from 'class com.analysys.avg2$SumCount'. Interpreting
it as a structured type was also not successful.
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
&nbsp;&nbsp; &nbsp;... 29 more
Caused by: org.apache.flink.table.api.ValidationException: Error in field
'map' of class 'com.analysys.avg2$SumCount'.
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredTypeFields(DataTypeExtractor.java:540)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:514)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
&nbsp;&nbsp; &nbsp;... 30 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a data type from 'class
org.roaringbitmap.longlong.Roaring64NavigableMap' in generic class
'org.apache.flink.table.functions.AggregateFunction' in class
com.analysys.avg2$Average. Please pass the required data type manually or
allow RAW types.
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredTypeFields(DataTypeExtractor.java:537)
&nbsp;&nbsp; &nbsp;... 32 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a data type from 'class
org.roaringbitmap.longlong.Roaring64NavigableMap'. Interpreting it as a
structured type was also not successful.
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
&nbsp;&nbsp; &nbsp;... 34 more
Caused by: org.apache.flink.table.api.ValidationException: Field
'highToBitmap' of class 'org.roaringbitmap.longlong.Roaring64NavigableMap'
is neither publicly accessible nor does it have a corresponding getter
method.
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:357)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredFieldReadability(ExtractionUtils.java:522)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.lambda$extractStructuredType$0(DataTypeExtractor.java:493)
&nbsp;&nbsp; &nbsp;at
java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
&nbsp;&nbsp; &nbsp;at
java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1351)
&nbsp;&nbsp; &nbsp;at
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
&nbsp;&nbsp; &nbsp;at
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
&nbsp;&nbsp; &nbsp;at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
&nbsp;&nbsp; &nbsp;at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
&nbsp;&nbsp; &nbsp;at
java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
&nbsp;&nbsp; &nbsp;at
java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
&nbsp;&nbsp; &nbsp;at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
&nbsp;&nbsp; &nbsp;at
java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:454)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:491)
&nbsp;&nbsp; &nbsp;at
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
&nbsp;&nbsp; &nbsp;... 35 more





陈卓


&nbsp;

Reply via email to