I was not able to reproduce it by re-running the same job with an updated
kryo library.  The join doesn't do anything special.

On Sun, Dec 19, 2021 at 4:58 PM Dan Hill <quietgol...@gmail.com> wrote:

> I'll retry the job to see if it's reproducible. The serialized state is
> bad so that run keeps failing.
>
> On Sun, Dec 19, 2021 at 4:28 PM Zhipeng Zhang <zhangzhipe...@gmail.com>
> wrote:
>
>> Hi Dan,
>>
>> Could you provide the code snippet such that we can reproduce the bug
>> here?
>>
>> Dan Hill <quietgol...@gmail.com> 于2021年12月20日周一 07:18写道:
>>
>>> Hi.
>>>
>>> I was curious if anyone else has hit this exception.  I'm using the
>>> IntervalJoinOperator to two streams of protos.  I registered the protos
>>> with a kryo serializer.  I started hitting this issue which looks like the
>>> operator is trying to deserialize a bad set of bytes that it serialized.
>>> I'm not doing anything weird or custom with the code.  It's a pretty simple
>>> interval join.
>>>
>>> Has anyone hit this before?  How have people solved this?  I skimmed the
>>> operator code and don't see an easy way to exclude the bad serialized
>>> bytes.  I could fork the interval join code and have a route that writes
>>> badly serialized
>>>
>>> A couple ideas:
>>> 1. I could fork the interval join code and have a route to handle bad
>>> serialization.
>>> 2. Maybe there's a weird case where the bytes become empty and this is
>>> an exception given for an empty array of bytes?
>>>
>>> Could this be a version issue?  My Flink version is v1.12.3 and
>>> Twitter/chill v0.9.4.
>>>
>>> Thoughts?
>>>
>>>
>>> java.lang.RuntimeException: Could not create class
>>> com.myexample.proto.MyJoinedOutput
>>>
>>>         at
>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>
>>>         at
>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:452)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:401)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:137)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:393)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:127)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.addToBuffer(IntervalJoinOperator.java:282)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement(IntervalJoinOperator.java:234)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement1(IntervalJoinOperator.java:194)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
>>>
>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>> java.io.EOFException: No more bytes left.
>>>
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>
>>>         ... 27 more
>>>
>>> Caused by: java.io.EOFException: No more bytes left.
>>>
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>
>>>         ... 27 more
>>>
>>
>>
>> --
>> best,
>> Zhipeng
>>
>>

Reply via email to