目前应该是不支持,一个替代方案是利用concat函数将数组转成string作为输入,再在你的UDF中拆成数组进行处理。
在 2023-02-15 16:29:19,"723849736" <723849...@qq.com.INVALID> 写道: >大家好, > >我在用flink sql的时候有一个场景,就是需要对数组中的某一列做变换,类似于spark sql中的tranform函数 > > >https://spark.apache.org/docs/latest/api/sql/index.html#transform > > >目前flink sql好像不支持类似的功能,这个功能用UDF能实现吗? > > >因为这个函数需要传入一个函数作为输入,函数类型的参数不是flink的data type,validate阶段会抛异常, 这个有办法解决吗? > > >class ArrayTransformFunction extends ScalarFunction { > > def eval(a: Array[Long], function: Long => Long): Array[Long] = { > a.map(e => function(e)) > }} >异常信息如下 > > >Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL >validation failed. An error occurred in the type inference logic of function >'transform'. > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) > at SQLTest$.main(SQLTest.scala:44) > at SQLTest.main(SQLTest.scala) >Caused by: org.apache.flink.table.api.ValidationException: An error occurred >in the type inference logic of function 'transform'. > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) > at java.util.Optional.flatMap(Optional.java:241) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) > at > org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147) > ... 6 more >Caused by: org.apache.flink.table.api.ValidationException: Could not extract a >valid type inference for function class 'udf.ArrayTransformFunction'. Please >check for implementation mistakes and/or provide a corresponding hint. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83) > at > org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160) > ... 17 more >Caused by: org.apache.flink.table.api.ValidationException: Error in extracting >a signature to output mapping. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148) > ... 20 more >Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a >type inference from method: >public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114) > ... 22 more >Caused by: org.apache.flink.table.api.ValidationException: Could not extract a >data type from 'scala.Function1<java.lang.Object, java.lang.Object>' in >parameter 1 of method 'eval' in class 'udf.ArrayTransformFunction'. Please >pass the required data type manually or allow RAW types. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:220) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:198) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:174) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:128) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385) > at java.util.Optional.orElseGet(Optional.java:267) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383) > at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) > at > java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) > at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:387) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:364) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:324) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169) > ... 23 more >Caused by: org.apache.flink.table.api.ValidationException: Could not extract a >data type from 'scala.Function1<java.lang.Object, java.lang.Object>'. >Interpreting it as a structured type was also not successful. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:270) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:212) > ... 43 more >Caused by: org.apache.flink.table.api.ValidationException: Class >'scala.Function1' must not be abstract. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328) > at > org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:162) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:453) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:268) > ... 44 more