Qinghui Xu created FLINK-35637:
----------------------------------
Summary: ScalarFunctionCallGen does not handle complex argument
type properly
Key: FLINK-35637
URL: https://issues.apache.org/jira/browse/FLINK-35637
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Reporter: Qinghui Xu
When trying to use a UDF that expects argument as `Array<RowData>`, error is
raised:
```
{{}}java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData
cannot be cast to org.apache.flink.table.data.RawValueData
at
org.apache.flink.table.data.GenericArrayData.getRawValue(GenericArrayData.java:223)
at
org.apache.flink.table.data.ArrayData.lambda$createElementGetter$95d74a6c$1(ArrayData.java:224)
at
org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1223)
at
org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:106)
at
org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1175)
at
org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1115)
at
org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:419)
at StreamExecCalc$1560.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
at MyUDFExpectingRowDataArray$$anonfun$run$1.apply
at MyUDFExpectingRowDataArray$$anonfun$run$1.apply at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
com.criteo.featureflow.flink.datadisco.test.JsonFileRowDataSource.run(TestBlinkGlupTableSource.scala:65)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
```
After digging into the `ScalarFunctionCallGen`, it turns out it's trying to
treat the argument as a `RAW` type while it should be a `ROW`.
The root cause seems to be that the codegen relies solely on the
`ScalarFunction` signature to refer the type which is the "external type". It
should instead take into consideration the type of the operand and bridge to
the external type.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)