[ 
https://issues.apache.org/jira/browse/FLINK-35637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35637:
-----------------------------------
    Labels: pull-request-available  (was: )

> ScalarFunctionCallGen does not handle complex argument type properly
> --------------------------------------------------------------------
>
>                 Key: FLINK-35637
>                 URL: https://issues.apache.org/jira/browse/FLINK-35637
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Qinghui Xu
>            Priority: Major
>              Labels: pull-request-available
>
> When trying to use a UDF that expects argument as `Array<RowData>`, error is 
> raised:
> {code:java}
> java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData 
> cannot be cast to org.apache.flink.table.data.RawValueData
> at 
> org.apache.flink.table.data.GenericArrayData.getRawValue(GenericArrayData.java:223)
> at 
> org.apache.flink.table.data.ArrayData.lambda$createElementGetter$95d74a6c$1(ArrayData.java:224)
>  
> at 
> org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1223)
> at 
> org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:106)
> at 
> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1175)
> at 
> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1115)
> at 
> org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:419)
> at StreamExecCalc$1560.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
> at MyUDFExpectingRowDataArray$$anonfun$run$1.apply
> at MyUDFExpectingRowDataArray$$anonfun$run$1.apply
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> com.criteo.featureflow.flink.datadisco.test.JsonFileRowDataSource.run(TestBlinkGlupTableSource.scala:65)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>  {code}
> After digging into the `ScalarFunctionCallGen`, it turns out it's trying to 
> treat the argument as a `RAW` type while it should be a `ROW`.
> The root cause seems to be that the codegen relies solely on the 
> `ScalarFunction` signature to refer the type which is the "external type". It 
> should instead take into consideration the type of the operand and bridge to 
> the external type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to