I was not able to reproduce it by re-running the same job with an updated kryo library. The join doesn't do anything special.
On Sun, Dec 19, 2021 at 4:58 PM Dan Hill <quietgol...@gmail.com> wrote: > I'll retry the job to see if it's reproducible. The serialized state is > bad so that run keeps failing. > > On Sun, Dec 19, 2021 at 4:28 PM Zhipeng Zhang <zhangzhipe...@gmail.com> > wrote: > >> Hi Dan, >> >> Could you provide the code snippet such that we can reproduce the bug >> here? >> >> Dan Hill <quietgol...@gmail.com> 于2021年12月20日周一 07:18写道: >> >>> Hi. >>> >>> I was curious if anyone else has hit this exception. I'm using the >>> IntervalJoinOperator to two streams of protos. I registered the protos >>> with a kryo serializer. I started hitting this issue which looks like the >>> operator is trying to deserialize a bad set of bytes that it serialized. >>> I'm not doing anything weird or custom with the code. It's a pretty simple >>> interval join. >>> >>> Has anyone hit this before? How have people solved this? I skimmed the >>> operator code and don't see an easy way to exclude the bad serialized >>> bytes. I could fork the interval join code and have a route that writes >>> badly serialized >>> >>> A couple ideas: >>> 1. I could fork the interval join code and have a route to handle bad >>> serialization. >>> 2. Maybe there's a weird case where the bytes become empty and this is >>> an exception given for an empty array of bytes? >>> >>> Could this be a version issue? My Flink version is v1.12.3 and >>> Twitter/chill v0.9.4. >>> >>> Thoughts? >>> >>> >>> java.lang.RuntimeException: Could not create class >>> com.myexample.proto.MyJoinedOutput >>> >>> at >>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76) >>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?] >>> >>> at >>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40) >>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?] >>> >>> at >>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:452) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:401) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:137) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:393) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:127) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.addToBuffer(IntervalJoinOperator.java:282) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement(IntervalJoinOperator.java:234) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement1(IntervalJoinOperator.java:194) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>> [flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>> [flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] >>> >>> Caused by: com.esotericsoftware.kryo.KryoException: >>> java.io.EOFException: No more bytes left. >>> >>> at >>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73) >>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?] >>> >>> ... 27 more >>> >>> Caused by: java.io.EOFException: No more bytes left. >>> >>> at >>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314) >>> ~[flink-dist_2.12-1.12.3.jar:1.12.3] >>> >>> at >>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73) >>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?] >>> >>> ... 27 more >>> >> >> >> -- >> best, >> Zhipeng >> >>