Hi Yuval,
we should definitely find the root cause of this issue. It helps if the
exception happens frequently to nail down the problem.
Have you tried to replace the JSON object with a regular String? If the
exception is gone after this change. I believe it must be the
serialization and not the network stack.
Regards,
Timo
On 28.01.21 10:29, Yuval Itzchakov wrote:
Hi,
I previously wrote about a problem I believed was caused by Kryo
serialization
(https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
<https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>),
which I am no longer sure is the case.
I have a job which involves a TableScan via a custom source operator
which generates a DataStream[RowData], a UDF to parse out a String =>
io.circe.Json object (which internally flows as a RAW('io.circe.Json')
data-type), and then an AggregateFunction with a java.util.List
accumulator which returns one of these objects and is used in a tumbling
window as follows:
SELECT any_json_array_value(parse_array(resources)) as resources_sample
FROM foo
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)
It generates the following physical plan:
optimize result:
Sink(table=[catalog.default-db.foo], fields=[resources_sample])
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, event_time,
3600000)], select=[any_json_array_value($f1) AS resources_sample])
+- Exchange(distribution=[single])
+- Calc(select=[event_time, parse_array(resources) AS $f1])
+- WatermarkAssigner(rowtime=[event_time], watermark=[event_time])
+- TableSourceScan(table=[[catalog, default-db, foo]],
fields=[resources])
When I run my job, I receive the following exception after 10 - 30
seconds (it varies, which gives me a hunch this is related to some race
condition that might be happening):
Caused by: java.io.IOException: Can't get next record for channel
InputChannelInfo{gateIdx=0, inputChannelIdx=0}
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization
methods. If you are using a Kryo-serialized type, check the
corresponding Kryo serializer.
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163)
... 8 more
Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019,
length: 546153590, index: 43, offset: 0
at
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
... 11 more
Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019,
length: 546153590, index: 43, offset: 0
at
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
... 11 more
Just to say that right now I am using the built in Kryo serialization
with no custom serializers.
Upon further investigation, it seems that StreamElementSerializer is
receiving a corrupt stream, which causes it to think it received a
record with timestamp:
image.png
As you can see, the tag is incorrectly as 0 and then a timestamp is
attempted to be read, yielding an invalid value (16), and once the
TypeSerialzier (BinaryRowDataSerializer) will try to decode this, it'll
fail with the index out of bounds exception.
The interesting thing is that, when I *disable operator chaining*
completely via (StreamExecutionEnvironment.disableOperatorChaining), the
problem does not reproduce.
I am wondering which sections of the networking + serialization stack
may help me further investigate this issue and understand what is
causing the corrupt stream to emerge, or perhaps if there are additional
logs that could assist.
--
Best Regards,
Yuval Itzchakov.