Hi Yuval,

we should definitely find the root cause of this issue. It helps if the exception happens frequently to nail down the problem.

Have you tried to replace the JSON object with a regular String? If the exception is gone after this change. I believe it must be the serialization and not the network stack.

Regards,
Timo


On 28.01.21 10:29, Yuval Itzchakov wrote:
Hi,

I previously wrote about a problem I believed was caused by Kryo serialization (https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>), which I am no longer sure is the case.

I have a job which involves a TableScan via a custom source operator which generates a DataStream[RowData], a UDF to parse out a String => io.circe.Json object (which internally flows as a RAW('io.circe.Json') data-type), and then an AggregateFunction with a java.util.List accumulator which returns one of these objects and is used in a tumbling window as follows:

     SELECT any_json_array_value(parse_array(resources)) as resources_sample
     FROM foo
     GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)

It generates the following physical plan:

optimize result:
Sink(table=[catalog.default-db.foo], fields=[resources_sample])
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, event_time, 3600000)], select=[any_json_array_value($f1) AS resources_sample])
    +- Exchange(distribution=[single])
       +- Calc(select=[event_time, parse_array(resources) AS $f1])
          +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time])
            +- TableSourceScan(table=[[catalog, default-db, foo]], fields=[resources])

When I run my job, I receive the following exception after 10 - 30 seconds (it varies, which gives me a hunch this is related to some race condition that might be happening):

Caused by: java.io.IOException: Can't get next record for channel InputChannelInfo{gateIdx=0, inputChannelIdx=0} at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163)
... 8 more
Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019, length: 546153590, index: 43, offset: 0 at org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
... 11 more
Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019, length: 546153590, index: 43, offset: 0 at org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
... 11 more

Just to say that right now I am using the built in Kryo serialization with no custom serializers.

Upon further investigation, it seems that StreamElementSerializer is receiving a corrupt stream, which causes it to think it received a record with timestamp:

image.png

As you can see, the tag is incorrectly as 0 and then a timestamp is attempted to be read, yielding an invalid value (16), and once the TypeSerialzier (BinaryRowDataSerializer) will try to decode this, it'll fail with the index out of bounds exception.

The interesting thing is that, when I *disable operator chaining* completely via (StreamExecutionEnvironment.disableOperatorChaining), the problem does not reproduce.

I am wondering which sections of the networking + serialization stack may help me further investigate this issue and understand what is causing the corrupt stream to emerge, or perhaps if there are additional logs that could assist.

--
Best Regards,
Yuval Itzchakov.

Reply via email to