Hi,

Thanks for the extra info, it was helpful (I’m not sure why your first logs 
didn’t have the full trace, though).

I spent some time digging through the error trace, and currently have some 
observations I would like to go through first:

1. So it seems like the ArrayIndexOutOfBoundsException was thrown while trying 
to access the state and making a copy (via serialization) in the 
CopyOnWriteStateTable.
2. The state that caused the exception seems to be the state of the reducing 
window function (i.e. the maxBy). The state type should be the same as the 
records in your `events` DataStream, which seems to be a Scala case class with 
some nested field that requires Kryo for serialization.
3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying to 
copy that field ..

My current guess would perhaps be that the serializer internally used may have 
been incorrectly shared, which is probably why this exception happens randomly 
for you.
I recall that there were similar issues that occurred before due to the fact 
that some KryoSerializers aren't thread-safe and was incorrectly shared in 
Flink.

I may need some help from you to be able to look at this a bit more:
- Is it possible that you disable asynchronous snapshots and try running this 
job a bit more to see if the problem still occurs? This is mainly to eliminate 
my guess on whether or not there is some incorrect serializer usage in the 
CopyOnWriteStateTable.
- Could you let us know what your `events` DataStream records type case class 
looks like?

Also looping in Aljoscha and Stefan here, as they would probably have more 
insights in this.

Cheers,
Gordon

On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio 
(federico.dambro...@smartlab.ws) wrote:

Hi Gordon,

I remembered that I had already seen this kind of exception once during the 
testing of the current job and fortunately I had the complete stacktrace still 
saved on my pc:

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
        at 
org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498)
        at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
        at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)

I don't know why now the stacktrace is getting output only for the first parts 
(handleWatermark and HeapReducingState).

So, it looks like something that has to do with the KryoSerializer. As a 
KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:

env.getConfig.addDefaultKryoSerializer(classOf[DateTime], 
classOf[JodaDateTimeSerializer])

I hope this could help.

Regards,
Federico

2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <federico.dambro...@smartlab.ws>:
Hi Gordon,

I'm currently using Flink 1.3.2 in local mode.

If it's any help I realized from the log that the complete task which is 
failing is:

2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.taskmanager.Task         
            - latest_time -> (map_active_stream, map_history_stream) (1/1) 
(5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED.

val events = keyedStreamByID 
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("time").name("latest_time").uid("latest_time")


val activeStream = events
  //Serialization to JsValue
  .map(event => 
event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
  //Global windowing, the cause of exception should be above
  .timeWindowAll(Time.seconds(10))
  .apply(new 
MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")

val historyStream = airtrafficEvents
  //Serialization to JsValue
  .map(event => 
event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream")
  //Global windowing, the cause of exception should be above
  .timeWindowAll(Time.seconds(10))
  .apply(new 
MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window")



Regards,
Federico

2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>:
Hi,

I’m looking into this. Could you let us know the Flink version in which the 
exceptions occurred?

Cheers,
Gordon


On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio 
(federico.dambro...@smartlab.ws) wrote:

Hi, I'm coming across these Exceptions while running a pretty simple flink job.
First one:
java.lang.RuntimeException: Exception occurred while processing valve output 
watermark:   
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException

The second one:
java.io.IOException: Exception while applying ReduceFunction in reducing state
        at 
org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


Since it looks like something is wrong in Watermark processing, in my case 
Watermarks are generated in my KafkaSource:

val stream = env.addSource(
  new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), 
consumerConfig)
    .setStartFromLatest()
    .assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
        def extractTimestamp(element: AirTrafficEvent): Long =
          element.instantValues.time.getMillis
      })
)
These exceptions aren't really that informative per se and, from what I see, 
the task triggering these exceptions is the following operator:

val events = keyedStreamByID
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("timestamp").name("latest_time").uid("latest_time")

What could be the problem here in your opinion? It's not emitting watermarks 
correctly? I'm not even how I could reproduce this exceptions, since it looks 
like they happen pretty much randomly.

Thank you all,
Federico D'Ambrosio



--
Federico D'Ambrosio



--
Federico D'Ambrosio

Reply via email to