Hi Federico,

I'm guessing the job is still working without asynchronous watermarks? I'm very 
eager to figure out what is actually going wrong with asynchronous checkpoints.

Best,
Aljoscha


> On 2. Oct 2017, at 11:57, Federico D'Ambrosio 
> <federico.dambro...@smartlab.ws> wrote:
> 
> As a followup:
> 
> the flink job has currently an uptime of almost 24 hours, with no checkpoint 
> failed or restart whereas, with async snapshots, it would have already 
> crashed 50 or so times.
> 
> Regards,
> Federico
> 
> 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio 
> <federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>>:
> Thank you very much, Gordon.
> 
> I'll try to run the job without the asynchronous snapshots first thing.
> 
> As for the Event data type: it's a case class with 2 fields: a String ID and 
> a composite case class (let's call it RealEvent) containing 3 fields of the 
> following types: Information, which is a case class with String fields, 
> Coordinates, a nested case class with 2 Double and InstantValues, with 3 
> Integers and a DateTime.This DateTime field in InstantValues is the one being 
> evalued in the maxBy (via InstantValues and RealEvent compareTo 
> implementations, because dot notation is not working in scala as of 1.3.2, 
> FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that was 
> the reason in the first place I had to register the JodaDateTimeSerializer 
> with Kryo.
> 
> Regards,
> Federico
> 
> 
> 
> 
> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org 
> <mailto:tzuli...@apache.org>>:
> Hi,
> 
> Thanks for the extra info, it was helpful (I’m not sure why your first logs 
> didn’t have the full trace, though).
> 
> I spent some time digging through the error trace, and currently have some 
> observations I would like to go through first:
> 
> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while 
> trying to access the state and making a copy (via serialization) in the 
> CopyOnWriteStateTable.
> 2. The state that caused the exception seems to be the state of the reducing 
> window function (i.e. the maxBy). The state type should be the same as the 
> records in your `events` DataStream, which seems to be a Scala case class 
> with some nested field that requires Kryo for serialization.
> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying to 
> copy that field ..
> 
> My current guess would perhaps be that the serializer internally used may 
> have been incorrectly shared, which is probably why this exception happens 
> randomly for you.
> I recall that there were similar issues that occurred before due to the fact 
> that some KryoSerializers aren't thread-safe and was incorrectly shared in 
> Flink.
> 
> I may need some help from you to be able to look at this a bit more:
> - Is it possible that you disable asynchronous snapshots and try running this 
> job a bit more to see if the problem still occurs? This is mainly to 
> eliminate my guess on whether or not there is some incorrect serializer usage 
> in the CopyOnWriteStateTable.
> - Could you let us know what your `events` DataStream records type case class 
> looks like?
> 
> Also looping in Aljoscha and Stefan here, as they would probably have more 
> insights in this.
> 
> Cheers,
> Gordon
> 
> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio 
> (federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>) 
> wrote:
> 
>> Hi Gordon,
>> 
>> I remembered that I had already seen this kind of exception once during the 
>> testing of the current job and fortunately I had the complete stacktrace 
>> still saved on my pc:
>> 
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>         at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
>>         at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>         at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>         at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>         at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>         at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>>         at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>>         at 
>> org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68)
>>         at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498)
>>         at 
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>>         at 
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>>         at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>>         at org.apache.flink.streaming.runtime.io 
>> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>>         at org.apache.flink.streaming.runtime.io 
>> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>         at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>         at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>         at org.apache.flink.streaming.runtime.io 
>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:748)
>> 
>> I don't know why now the stacktrace is getting output only for the first 
>> parts (handleWatermark and HeapReducingState).
>> 
>> So, it looks like something that has to do with the KryoSerializer. As a 
>> KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:
>> 
>> env.getConfig.addDefaultKryoSerializer(classOf[DateTime], 
>> classOf[JodaDateTimeSerializer])
>> 
>> I hope this could help.
>> 
>> Regards,
>> Federico
>> 
>> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio 
>> <federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>>:
>> Hi Gordon,
>> 
>> I'm currently using Flink 1.3.2 in local mode.
>> 
>> If it's any help I realized from the log that the complete task which is 
>> failing is:
>> 
>> 2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.taskmanager.Task      
>>                - latest_time -> (map_active_stream, map_history_stream) 
>> (1/1) (5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED.
>> 
>> val events = keyedStreamByID 
>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>   .maxBy("time").name("latest_time").uid("latest_time")
>> 
>> 
>> val activeStream = events
>>   //Serialization to JsValue
>>   .map(event => 
>> event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
>>   //Global windowing, the cause of exception should be above
>>   .timeWindowAll(Time.seconds(10))
>>   .apply(new 
>> MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")
>> 
>> val historyStream = airtrafficEvents
>>   //Serialization to JsValue
>>   .map(event => 
>> event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream")
>>   //Global windowing, the cause of exception should be above
>>   .timeWindowAll(Time.seconds(10))
>>   .apply(new 
>> MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window")
>> 
>> 
>> 
>> Regards,
>> Federico
>> 
>> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org 
>> <mailto:tzuli...@apache.org>>:
>> Hi,
>> 
>> I’m looking into this. Could you let us know the Flink version in which the 
>> exceptions occurred?
>> 
>> Cheers,
>> Gordon
>> 
>> 
>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio 
>> (federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>) 
>> wrote:
>> 
>>> Hi, I'm coming across these Exceptions while running a pretty simple flink 
>>> job.
>>> First one:
>>> java.lang.RuntimeException: Exception occurred while processing valve 
>>> output watermark:   
>>>         at org.apache.flink.streaming.runtime.io 
>>> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>         at 
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>         at 
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>         at org.apache.flink.streaming.runtime.io 
>>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>> 
>>> The second one:
>>> java.io.IOException: Exception while applying ReduceFunction in reducing 
>>> state
>>>         at 
>>> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
>>>         at 
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>>>         at org.apache.flink.streaming.runtime.io 
>>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>> 
>>> 
>>> Since it looks like something is wrong in Watermark processing, in my case 
>>> Watermarks are generated in my KafkaSource:
>>> 
>>> val stream = env.addSource(
>>>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), 
>>> consumerConfig)
>>>     .setStartFromLatest()
>>>     .assignTimestampsAndWatermarks(
>>>       new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>>>         def extractTimestamp(element: AirTrafficEvent): Long =
>>>           element.instantValues.time.getMillis
>>>       })
>>> )
>>> These exceptions aren't really that informative per se and, from what I 
>>> see, the task triggering these exceptions is the following operator:
>>> 
>>> val events = keyedStreamByID
>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>>> 
>>> What could be the problem here in your opinion? It's not emitting 
>>> watermarks correctly? I'm not even how I could reproduce this exceptions, 
>>> since it looks like they happen pretty much randomly.
>>> 
>>> Thank you all,
>>> Federico D'Ambrosio
>> 
>> 
>> 
>> --
>> Federico D'Ambrosio
>> 
>> 
>> 
>> --
>> Federico D'Ambrosio
> 
> 
> 
> -- 
> Federico D'Ambrosio
> 
> 
> 
> -- 
> Federico D'Ambrosio

Reply via email to