Hi Dan,
in my experience this kind of errors are caused by some other problem
that's not immediately obvious (like some serialization, memory or RocksDB
problem).
Could it be that an Avro field cannot be null or viceversa?

On Tue, Dec 21, 2021 at 7:21 PM Dan Hill <quietgol...@gmail.com> wrote:

> I was not able to reproduce it by re-running the same job with an updated
> kryo library.  The join doesn't do anything special.
>
> On Sun, Dec 19, 2021 at 4:58 PM Dan Hill <quietgol...@gmail.com> wrote:
>
>> I'll retry the job to see if it's reproducible. The serialized state is
>> bad so that run keeps failing.
>>
>> On Sun, Dec 19, 2021 at 4:28 PM Zhipeng Zhang <zhangzhipe...@gmail.com>
>> wrote:
>>
>>> Hi Dan,
>>>
>>> Could you provide the code snippet such that we can reproduce the bug
>>> here?
>>>
>>> Dan Hill <quietgol...@gmail.com> 于2021年12月20日周一 07:18写道:
>>>
>>>> Hi.
>>>>
>>>> I was curious if anyone else has hit this exception.  I'm using the
>>>> IntervalJoinOperator to two streams of protos.  I registered the protos
>>>> with a kryo serializer.  I started hitting this issue which looks like the
>>>> operator is trying to deserialize a bad set of bytes that it serialized.
>>>> I'm not doing anything weird or custom with the code.  It's a pretty simple
>>>> interval join.
>>>>
>>>> Has anyone hit this before?  How have people solved this?  I skimmed
>>>> the operator code and don't see an easy way to exclude the bad serialized
>>>> bytes.  I could fork the interval join code and have a route that writes
>>>> badly serialized
>>>>
>>>> A couple ideas:
>>>> 1. I could fork the interval join code and have a route to handle bad
>>>> serialization.
>>>> 2. Maybe there's a weird case where the bytes become empty and this is
>>>> an exception given for an empty array of bytes?
>>>>
>>>> Could this be a version issue?  My Flink version is v1.12.3 and
>>>> Twitter/chill v0.9.4.
>>>>
>>>> Thoughts?
>>>>
>>>>
>>>> java.lang.RuntimeException: Could not create class
>>>> com.myexample.proto.MyJoinedOutput
>>>>
>>>>         at
>>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
>>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>>
>>>>         at
>>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
>>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>>
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:452)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:401)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:137)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:393)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:127)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.addToBuffer(IntervalJoinOperator.java:282)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement(IntervalJoinOperator.java:234)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement1(IntervalJoinOperator.java:194)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
>>>>
>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>> java.io.EOFException: No more bytes left.
>>>>
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>>
>>>>         ... 27 more
>>>>
>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>>
>>>>         ... 27 more
>>>>
>>>
>>>
>>> --
>>> best,
>>> Zhipeng
>>>
>>>

Reply via email to