Hi YuXia,

Thanks for your advice.


By adding the hint, the type validation can pass.
But still I can't pass the function to this udf
Here is my query


select array_transform(ids, id -> id +1) from tmp_table


The lambda function  id -> id +1 can't be passed because "->" 
is not supported in calcite now.


Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "- >" at line 3, column 40.
    at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
      at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74)
      at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)













                       
Original Email
                       
                     

Sender:"yuxia"< luoyu...@alumni.sjtu.edu.cn &gt;;

Sent Time:2023/2/20 10:00

To:"Xuekui"< baixue...@foxmail.com &gt;;

Cc recipient:"fskmine"< fskm...@gmail.com &gt;;"Caizhi Weng"< 
tsreape...@gmail.com &gt;;"User"< user@flink.apache.org &gt;;

Subject:Re: Flink SQL support array transform function


Hi, Xuekui.&nbsp;
As said in the exception stack,&nbsp; may be you can try to provide hint for 
the function's parameters.




class ArrayTransformFunction extends ScalarFunction {

 def eval(@DataTypeHint("ARRAY<BIGINT&gt;") a: Array[Long],
 @DataTypeHint("RAW") function: Long =&gt; Long): Array[Long] = {
 a.map(e =&gt; function(e))
 }}
Hope it can help. 
For more detail, please refer to Flink doc[1]

[1]:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference




Best regards,
Yuxia



发件人: "Xuekui" <baixue...@foxmail.com&gt;
收件人: "fskmine" <fskm...@gmail.com&gt;, "Caizhi Weng" <tsreape...@gmail.com&gt;
抄送: "User" <user@flink.apache.org&gt;
发送时间: 星期四, 2023年 2 月 16日 上午 10:54:05
主题: Re: &nbsp;Flink SQL support array transform function



Hi Caizhi,

I've tried to write UDF to support this function, but I found I can't pass the 
function parameter to udf because the data type of function is not supported. 
An exception throws in SQL validation.

My UDF code:
class ArrayTransformFunction extends ScalarFunction {

  def eval(a: Array[Long], function: Long =&gt; Long): Array[Long] = {
    a.map(e =&gt; function(e))
  }}
Exception:Exception in thread "main" 
org.apache.flink.table.api.ValidationException: SQL validation failed. An error 
occurred in the type inference logic of function 'transform'.
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
        at SQLTest$.main(SQLTest.scala:44)
        at SQLTest.main(SQLTest.scala)
Caused by: org.apache.flink.table.api.ValidationException: An error occurred in 
the type inference logic of function 'transform'.
        at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
        at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
        at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
        at java.util.Optional.flatMap(Optional.java:241)
        at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
        at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
        ... 6 more
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a 
valid type inference for function class 'udf.ArrayTransformFunction'. Please 
check for implementation mistakes and/or provide a corresponding hint.
        at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
        at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
        at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
        at 
org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
        at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
        ... 17 more
Caused by: org.apache.flink.table.api.ValidationException: Error in extracting 
a signature to output mapping.
        at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
        at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
        at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
        at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
        ... 20 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a 
type inference from method:
public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1)
        at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
        at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
        at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
        ... 22 more
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a 
data type from 'scala.Function1<java.lang.Object, java.lang.Object&gt;' in 
parameter 1 of method 'eval' in class 'udf.ArrayTransformFunction'. Please pass 
the required data type manually or allow RAW types.
        at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
        at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:220)
        at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:198)
        at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:174)
        at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:128)
        at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409)
        at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385)
        at java.util.Optional.orElseGet(Optional.java:267)
        at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383)
        at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
        at 
java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
        at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:387)
        at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:364)
        at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:324)
        at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
        at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
        ... 23 more
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a 
data type from 'scala.Function1<java.lang.Object, java.lang.Object&gt;'. 
Interpreting it as a structured type was also not successful.
        at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
        at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:270)
        at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:212)
        ... 43 more
Caused by: org.apache.flink.table.api.ValidationException: Class 
'scala.Function1' must not be abstract.
        at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
        at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328)
        at 
org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:162)
        at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:453)
        at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:268)
        ... 44 more


Do you have any advice on this udf?


Thanks


------------------&nbsp;Original&nbsp;------------------
From: &nbsp;"Shengkai Fang";<fskm...@gmail.com&gt;;
Send time:&nbsp;Tuesday, Aug 3, 2021 8:51 PM
To:&nbsp;"Caizhi Weng"<tsreape...@gmail.com&gt;; 
Cc:&nbsp;"Xuekui"<baixue...@foxmail.com&gt;; "user"<user@flink.apache.org&gt;; 
Subject: &nbsp;Re: Flink SQL support array transform function


Hi, Caizhi. Do you think we should support this? Maybe we can open a jira for 
this or to align with the spark to support more useful built-in functions.



Caizhi Weng <tsreape...@gmail.com&gt; 于2021年8月3日周二 下午3:42写道:

Hi!
Currently there is no such built-in function in Flink SQL. You can try to write 
your own user-defined function[1] to achieve this.

[1]&nbsp;https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/


Xuekui <baixue...@foxmail.com&gt; 于2021年8月3日周二 下午3:22写道:

Hi,

I'm using Flink SQL and need to do some transformation for one array column, 
just like spark sql transform function.
https://spark.apache.org/docs/latest/api/sql/index.html#transform

I found it's not supported by Flink SQL , is there any plan for it?


Thank you

Reply via email to