This is helpful information. So I guess the problem must be in the flink-table module and not in flink-core. I will try to reserve some time tomorrow to look into the code again. How did you express RawType(Array[String])? Again with fully serialized type string?

Could it be related to https://issues.apache.org/jira/browse/FLINK-20986 ?

Regards,
Timo


On 28.01.21 16:30, Yuval Itzchakov wrote:
Hi Timo,

I tried replacing it with an ordinary ARRAY<STRING> DataType, which doesn't reproduce the issue. If I use a RawType(Array[String]), the problem still manifests, so I assume it's not directly related to a Kryo serialization of the specific underlying type (io.circe.Json), but something in the way it interacts with BinaryRawValueData and writing out to the network buffer behind the scenes.

On Thu, Jan 28, 2021 at 5:26 PM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Yuval,

    we should definitely find the root cause of this issue. It helps if the
    exception happens frequently to nail down the problem.

    Have you tried to replace the JSON object with a regular String? If the
    exception is gone after this change. I believe it must be the
    serialization and not the network stack.

    Regards,
    Timo


    On 28.01.21 10:29, Yuval Itzchakov wrote:
     > Hi,
     >
     > I previously wrote about a problem I believed was caused by Kryo
     > serialization
     >
    
(https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
    
<https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>

     >
    
<https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
    
<https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>>),

     > which I am no longer sure is the case.
     >
     > I have a job which involves a TableScan via a custom source operator
     > which generates a DataStream[RowData], a UDF to parse out a
    String =>
     > io.circe.Json object (which internally flows as a
    RAW('io.circe.Json')
     > data-type), and then an AggregateFunction with a java.util.List
     > accumulator which returns one of these objects and is used in a
    tumbling
     > window as follows:
     >
     >      SELECT any_json_array_value(parse_array(resources)) as
    resources_sample
     >      FROM foo
     >      GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)
     >
     > It generates the following physical plan:
     >
     > optimize result:
     > Sink(table=[catalog.default-db.foo], fields=[resources_sample])
     > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, event_time,
     > 3600000)], select=[any_json_array_value($f1) AS resources_sample])
     >     +- Exchange(distribution=[single])
     >        +- Calc(select=[event_time, parse_array(resources) AS $f1])
     >           +- WatermarkAssigner(rowtime=[event_time],
    watermark=[event_time])
     >              +- TableSourceScan(table=[[catalog, default-db, foo]],
     > fields=[resources])
     >
     > When I run my job, I receive the following exception after 10 - 30
     > seconds (it varies, which gives me a hunch this is related to
    some race
     > condition that might be happening):
     >
     > Caused by: java.io.IOException: Can't get next record for channel
     > InputChannelInfo{gateIdx=0, inputChannelIdx=0}
     > at
     > org.apache.flink.streaming.runtime.io
    
<http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
     > at
     > org.apache.flink.streaming.runtime.io
    
<http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
     > at
     >
    
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
     > at
     >
    
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
     > at
     >
    
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
     > at
     >
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
     > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
     > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
     > at java.lang.Thread.run(Thread.java:748)
     > Caused by: java.io.IOException: Serializer consumed more bytes
    than the
     > record had. This indicates broken serialization. If you are using
    custom
     > serialization types (Value or Writable), check their serialization
     > methods. If you are using a Kryo-serialized type, check the
     > corresponding Kryo serializer.
     > at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
     > at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111)
     > at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86)
     > at
     > org.apache.flink.streaming.runtime.io
    
<http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163)
     > ... 8 more
     > Caused by: java.lang.IndexOutOfBoundsException: pos:
    140289414591019,
     > length: 546153590, index: 43, offset: 0
     > at
     >
    
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
     > at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
     > at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
     > at
     >
    
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
     > at
     >
    
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
     > at
     >
    
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
     > at
     >
    
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
     > at
     >
    
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
     > at
     >
    
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
     > at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
     > ... 11 more
     > Caused by: java.lang.IndexOutOfBoundsException: pos:
    140289414591019,
     > length: 546153590, index: 43, offset: 0
     > at
     >
    
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
     > at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
     > at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
     > at
     >
    
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
     > at
     >
    
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
     > at
     >
    
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
     > at
     >
    
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
     > at
     >
    
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
     > at
     >
    
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
     > at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
     > ... 11 more
     >
     > Just to say that right now I am using the built in Kryo
    serialization
     > with no custom serializers.
     >
     > Upon further investigation, it seems that StreamElementSerializer is
     > receiving a corrupt stream, which causes it to think it received a
     > record with timestamp:
     >
     > image.png
     >
     > As you can see, the tag is incorrectly as 0 and then a timestamp is
     > attempted to be read, yielding an invalid value (16), and once the
     > TypeSerialzier (BinaryRowDataSerializer) will try to decode this,
    it'll
     > fail with the index out of bounds exception.
     >
     > The interesting thing is that, when I *disable operator chaining*
     > completely via
    (StreamExecutionEnvironment.disableOperatorChaining), the
     > problem does not reproduce.
     >
     > I am wondering which sections of the networking + serialization
    stack
     > may help me further investigate this issue and understand what is
     > causing the corrupt stream to emerge, or perhaps if there are
    additional
     > logs that could assist.
     >
     > --
     > Best Regards,
     > Yuval Itzchakov.



--
Best Regards,
Yuval Itzchakov.

Reply via email to