[ https://issues.apache.org/jira/browse/FLINK-21876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Godfrey He updated FLINK-21876: ------------------------------- Fix Version/s: 1.17.0 (was: 1.16.0) > Handle it properly when the returned value of Python UDF doesn't match the > defined result type > ---------------------------------------------------------------------------------------------- > > Key: FLINK-21876 > URL: https://issues.apache.org/jira/browse/FLINK-21876 > Project: Flink > Issue Type: Improvement > Components: API / Python > Affects Versions: 1.10.0, 1.11.0, 1.12.0 > Reporter: Dian Fu > Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > Fix For: 1.12.8, 1.17.0 > > > Currently, when the returned value of Python UDF doesn't match the defined > result type of the Python UDF, it will thrown the following exception during > execution: > {code:java} > Caused by: java.io.EOFException > at java.io.DataInputStream.readFully(DataInputStream.java:197) > at java.io.DataInputStream.readFully(DataInputStream.java:169) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34) > at > org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129) > at > org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110) > at > org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) > at > org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106) > at > org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) > at > org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123) > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170) > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110) > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.lang.Thread.run(Thread.java:748) > {code} > The exception isn't straight forward for users and it's difficult for users > to figure out the root cause of the issue. > As Python is dynamic language, this case should be very common and it would > be great if we could handle this case properly. > See > [https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully] > for more details. -- This message was sent by Atlassian Jira (v8.20.10#820010)