Hi Timo,

I tried replacing it with an ordinary ARRAY<STRING> DataType, which doesn't
reproduce the issue.
If I use a RawType(Array[String]), the problem still manifests, so I assume
it's not directly related to a Kryo serialization of the specific
underlying type (io.circe.Json), but something in the way it interacts with
BinaryRawValueData and writing out to the network buffer behind the scenes.

On Thu, Jan 28, 2021 at 5:26 PM Timo Walther <twal...@apache.org> wrote:

> Hi Yuval,
>
> we should definitely find the root cause of this issue. It helps if the
> exception happens frequently to nail down the problem.
>
> Have you tried to replace the JSON object with a regular String? If the
> exception is gone after this change. I believe it must be the
> serialization and not the network stack.
>
> Regards,
> Timo
>
>
> On 28.01.21 10:29, Yuval Itzchakov wrote:
> > Hi,
> >
> > I previously wrote about a problem I believed was caused by Kryo
> > serialization
> > (
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
> > <
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>),
>
> > which I am no longer sure is the case.
> >
> > I have a job which involves a TableScan via a custom source operator
> > which generates a DataStream[RowData], a UDF to parse out a String =>
> > io.circe.Json object (which internally flows as a RAW('io.circe.Json')
> > data-type), and then an AggregateFunction with a java.util.List
> > accumulator which returns one of these objects and is used in a tumbling
> > window as follows:
> >
> >      SELECT any_json_array_value(parse_array(resources)) as
> resources_sample
> >      FROM foo
> >      GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)
> >
> > It generates the following physical plan:
> >
> > optimize result:
> > Sink(table=[catalog.default-db.foo], fields=[resources_sample])
> > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, event_time,
> > 3600000)], select=[any_json_array_value($f1) AS resources_sample])
> >     +- Exchange(distribution=[single])
> >        +- Calc(select=[event_time, parse_array(resources) AS $f1])
> >           +- WatermarkAssigner(rowtime=[event_time],
> watermark=[event_time])
> >              +- TableSourceScan(table=[[catalog, default-db, foo]],
> > fields=[resources])
> >
> > When I run my job, I receive the following exception after 10 - 30
> > seconds (it varies, which gives me a hunch this is related to some race
> > condition that might be happening):
> >
> > Caused by: java.io.IOException: Can't get next record for channel
> > InputChannelInfo{gateIdx=0, inputChannelIdx=0}
> > at
> > org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
> > at
> > org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.IOException: Serializer consumed more bytes than the
> > record had. This indicates broken serialization. If you are using custom
> > serialization types (Value or Writable), check their serialization
> > methods. If you are using a Kryo-serialized type, check the
> > corresponding Kryo serializer.
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86)
> > at
> > org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163)
> > ... 8 more
> > Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019,
> > length: 546153590, index: 43, offset: 0
> > at
> >
> org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
> > at
> >
> org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
> > at
> >
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
> > at
> >
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> > at
> >
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> > ... 11 more
> > Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019,
> > length: 546153590, index: 43, offset: 0
> > at
> >
> org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
> > at
> >
> org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
> > at
> >
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
> > at
> >
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> > at
> >
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> > ... 11 more
> >
> > Just to say that right now I am using the built in Kryo serialization
> > with no custom serializers.
> >
> > Upon further investigation, it seems that StreamElementSerializer is
> > receiving a corrupt stream, which causes it to think it received a
> > record with timestamp:
> >
> > image.png
> >
> > As you can see, the tag is incorrectly as 0 and then a timestamp is
> > attempted to be read, yielding an invalid value (16), and once the
> > TypeSerialzier (BinaryRowDataSerializer) will try to decode this, it'll
> > fail with the index out of bounds exception.
> >
> > The interesting thing is that, when I *disable operator chaining*
> > completely via (StreamExecutionEnvironment.disableOperatorChaining), the
> > problem does not reproduce.
> >
> > I am wondering which sections of the networking + serialization stack
> > may help me further investigate this issue and understand what is
> > causing the corrupt stream to emerge, or perhaps if there are additional
> > logs that could assist.
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
>
>

-- 
Best Regards,
Yuval Itzchakov.

Reply via email to