Hello,

I’m trying to convert some of our larger stateful computations into something 
that aligns more with the Flink windowing framework, and particularly, start 
using “event time” instead of “ingest time” as a time characteristics.

My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka 
source), and while my data is generally time-ordered, there are some upstream 
races, so I’m attempting to assign timestamps and watermarks using 
BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When I 
assign timestamps directly in the Kafka sources (I’m also connecting two Kafka 
streams here) using FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things 
work ok, but my extractor has to do a bunch of “faking” because not every 
record that is produced will have a valid timestamp - for example, a record 
that can’t be parsed won’t.

When I assign timestamps downstream, after filtering the stream down to just 
records that are going to be windowed, I see errors in my Flink job:

java.io.IOException: Exception while applying AggregateFunction in aggregating 
state
        at 
org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
        at 
org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
        ... 6 more

I am calling aggregate() on my windows, but otherwise I see very little 
information that I can use to dig into this issue. Can anyone give me any 
insight into what is going wrong here? I’d much prefer assigning timestamps 
after filtering, rather than in the Kafka source, because I can filter down to 
only records that I know will have timestamps.

When experimenting with the lateness in my timestamp/watermark assigner, I also 
saw a similarly opaque exception:

java.lang.RuntimeException: Exception occurred while processing valve output 
watermark: 
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
        at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
        at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
        at 
org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
        …


Any tips?


Thanks,

Andrew
-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*

Reply via email to