It seems an exception thrown when Flink try to deserialize the object outputed 
by your udf. So is the obejct produced by your udf serializable? Does it 
contain any lambda function in the object/class? 

Best regards, 
Yuxia 


发件人: "Tom Thornton" <thom...@yelp.com> 
收件人: "User" <user@flink.apache.org> 
发送时间: 星期五, 2022年 5 月 27日 上午 6:47:04 
主题: Exception when running Java UDF with Blink table planner 

We are migrating from the legacy table planner to the Blink table planner. 
Previously we had a UDF defined like this that worked without issue: 
public class ListToString extends DPScalarFunction { 
public String eval (List list) { 
return "foo" ; 
} 
Since moving to the Blink table planner and receiving this error: 

Caused by: org.apache.flink.table.api.ValidationException: Given parameters of 
function 'ListToString' do not match any signature. 
Actual: (java.lang.String[]) 
Expected: (java.util.List) 

We refactored the UDF to take as input an Object[] to match what is received 
from Blink: 
public class ListToString extends DPScalarFunction { 
public String eval (Object[] arr) { return "foo" ; 
} 
} 
Now the UDF always fails (including for the simplified example above where we 
return a constant string regardless of input). For example, when we run on a 
query like this one: 
SELECT ListToString(`col1`) as col1_string FROM `table` 
Produces an IndexOutOfBoundsException: 
Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for 
length 0 
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) 
at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
 
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) 
at java.base/java.util.Objects.checkIndex(Objects.java:372) 
at java.base/java.util.ArrayList.get(ArrayList.java:459) 
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
 
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
 
at 
org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
 
at 
org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
 
at 
org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
 
at 
org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
 
at 
org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
 
at 
org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
 
at 
org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
 
at 
org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
 
at 
org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
 
at StreamExecCalc$337.processElement(Unknown Source) 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
 
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 
at SourceConversion$328.processElement(Unknown Source) 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
 
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
 
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
 
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114)
 
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
 
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187)
 
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146)
 
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833)
 
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825)
 
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)
 
Any ideas what may be causing this? 

Reply via email to