I think I finally found the problem, there was also already another bug report 
for this: https://issues.apache.org/jira/browse/FLINK-7484

> On 12. Oct 2017, at 18:22, Federico D'Ambrosio 
> <federico.dambro...@smartlab.ws> wrote:
> 
> Hi Aljoscha, 
> 
> yes, just like you're guessing, without asynchronous checkpoints, there has 
> been no crash so far.
> 
> Regards,
> Federico
> 
> 2017-10-12 18:08 GMT+02:00 Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>>:
> Hi Federico,
> 
> I'm guessing the job is still working without asynchronous watermarks? I'm 
> very eager to figure out what is actually going wrong with asynchronous 
> checkpoints.
> 
> Best,
> Aljoscha
> 
> 
>> On 2. Oct 2017, at 11:57, Federico D'Ambrosio 
>> <federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>> 
>> wrote:
>> 
>> As a followup:
>> 
>> the flink job has currently an uptime of almost 24 hours, with no checkpoint 
>> failed or restart whereas, with async snapshots, it would have already 
>> crashed 50 or so times.
>> 
>> Regards,
>> Federico
>> 
>> 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio 
>> <federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>>:
>> Thank you very much, Gordon.
>> 
>> I'll try to run the job without the asynchronous snapshots first thing.
>> 
>> As for the Event data type: it's a case class with 2 fields: a String ID and 
>> a composite case class (let's call it RealEvent) containing 3 fields of the 
>> following types: Information, which is a case class with String fields, 
>> Coordinates, a nested case class with 2 Double and InstantValues, with 3 
>> Integers and a DateTime.This DateTime field in InstantValues is the one 
>> being evalued in the maxBy (via InstantValues and RealEvent compareTo 
>> implementations, because dot notation is not working in scala as of 1.3.2, 
>> FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that was 
>> the reason in the first place I had to register the JodaDateTimeSerializer 
>> with Kryo.
>> 
>> Regards,
>> Federico
>> 
>> 
>> 
>> 
>> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org 
>> <mailto:tzuli...@apache.org>>:
>> Hi,
>> 
>> Thanks for the extra info, it was helpful (I’m not sure why your first logs 
>> didn’t have the full trace, though).
>> 
>> I spent some time digging through the error trace, and currently have some 
>> observations I would like to go through first:
>> 
>> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while 
>> trying to access the state and making a copy (via serialization) in the 
>> CopyOnWriteStateTable.
>> 2. The state that caused the exception seems to be the state of the reducing 
>> window function (i.e. the maxBy). The state type should be the same as the 
>> records in your `events` DataStream, which seems to be a Scala case class 
>> with some nested field that requires Kryo for serialization.
>> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying 
>> to copy that field ..
>> 
>> My current guess would perhaps be that the serializer internally used may 
>> have been incorrectly shared, which is probably why this exception happens 
>> randomly for you.
>> I recall that there were similar issues that occurred before due to the fact 
>> that some KryoSerializers aren't thread-safe and was incorrectly shared in 
>> Flink.
>> 
>> I may need some help from you to be able to look at this a bit more:
>> - Is it possible that you disable asynchronous snapshots and try running 
>> this job a bit more to see if the problem still occurs? This is mainly to 
>> eliminate my guess on whether or not there is some incorrect serializer 
>> usage in the CopyOnWriteStateTable.
>> - Could you let us know what your `events` DataStream records type case 
>> class looks like?
>> 
>> Also looping in Aljoscha and Stefan here, as they would probably have more 
>> insights in this.
>> 
>> Cheers,
>> Gordon
>> 
>> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio 
>> (federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>) 
>> wrote:
>> 
>>> Hi Gordon,
>>> 
>>> I remembered that I had already seen this kind of exception once during the 
>>> testing of the current job and fortunately I had the complete stacktrace 
>>> still saved on my pc:
>>> 
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>>         at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
>>>         at 
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>>         at 
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>>         at 
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>>         at 
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge 
>>> <http://heap.copyonwritestatetable.ge/>t(CopyOnWriteStateTable.java:279)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge 
>>> <http://heap.copyonwritestatetable.ge/>t(CopyOnWriteStateTable.java:296)
>>>         at 
>>> org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68)
>>>         at 
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498)
>>>         at 
>>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>>>         at 
>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>>>         at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>>>         at org.apache.flink.streaming.runtime.io 
>>> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>>>         at org.apache.flink.streaming.runtime.io 
>>> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>         at 
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>         at 
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>         at org.apache.flink.streaming.runtime.io 
>>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>         at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> 
>>> I don't know why now the stacktrace is getting output only for the first 
>>> parts (handleWatermark and HeapReducingState).
>>> 
>>> So, it looks like something that has to do with the KryoSerializer. As a 
>>> KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:
>>> 
>>> env.getConfig.addDefaultKryoSerializer(classOf[DateTime], 
>>> classOf[JodaDateTimeSerializer])
>>> 
>>> I hope this could help.
>>> 
>>> Regards,
>>> Federico
>>> 
>>> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio 
>>> <federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>>:
>>> Hi Gordon,
>>> 
>>> I'm currently using Flink 1.3.2 in local mode.
>>> 
>>> If it's any help I realized from the log that the complete task which is 
>>> failing is:
>>> 
>>> 2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.taskmanager.Task     
>>>                 - latest_time -> (map_active_stream, map_history_stream) 
>>> (1/1) (5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED.
>>> 
>>> val events = keyedStreamByID 
>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>   .maxBy("time").name("latest_time").uid("latest_time")
>>> 
>>> 
>>> val activeStream = events
>>>   //Serialization to JsValue
>>>   .map(event => 
>>> event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
>>>   //Global windowing, the cause of exception should be above
>>>   .timeWindowAll(Time.seconds(10))
>>>   .apply(new 
>>> MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")
>>> 
>>> val historyStream = airtrafficEvents
>>>   //Serialization to JsValue
>>>   .map(event => 
>>> event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream")
>>>   //Global windowing, the cause of exception should be above
>>>   .timeWindowAll(Time.seconds(10))
>>>   .apply(new 
>>> MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window")
>>> 
>>> 
>>> 
>>> Regards,
>>> Federico
>>> 
>>> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org 
>>> <mailto:tzuli...@apache.org>>:
>>> Hi,
>>> 
>>> I’m looking into this. Could you let us know the Flink version in which the 
>>> exceptions occurred?
>>> 
>>> Cheers,
>>> Gordon
>>> 
>>> 
>>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio 
>>> (federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>) 
>>> wrote:
>>> 
>>>> Hi, I'm coming across these Exceptions while running a pretty simple flink 
>>>> job.
>>>> First one:
>>>> java.lang.RuntimeException: Exception occurred while processing valve 
>>>> output watermark:   
>>>>         at org.apache.flink.streaming.runtime.io 
>>>> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>>         at org.apache.flink.streaming.runtime.io 
>>>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>> 
>>>> The second one:
>>>> java.io.IOException: Exception while applying ReduceFunction in reducing 
>>>> state
>>>>         at 
>>>> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>>>>         at org.apache.flink.streaming.runtime.io 
>>>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>> 
>>>> 
>>>> Since it looks like something is wrong in Watermark processing, in my case 
>>>> Watermarks are generated in my KafkaSource:
>>>> 
>>>> val stream = env.addSource(
>>>>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), 
>>>> consumerConfig)
>>>>     .setStartFromLatest()
>>>>     .assignTimestampsAndWatermarks(
>>>>       new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) 
>>>> {
>>>>         def extractTimestamp(element: AirTrafficEvent): Long =
>>>>           element.instantValues.time.getMillis
>>>>       })
>>>> )
>>>> These exceptions aren't really that informative per se and, from what I 
>>>> see, the task triggering these exceptions is the following operator:
>>>> 
>>>> val events = keyedStreamByID
>>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>>>> 
>>>> What could be the problem here in your opinion? It's not emitting 
>>>> watermarks correctly? I'm not even how I could reproduce this exceptions, 
>>>> since it looks like they happen pretty much randomly.
>>>> 
>>>> Thank you all,
>>>> Federico D'Ambrosio
>>> 
>>> 
>>> 
>>> --
>>> Federico D'Ambrosio
>>> 
>>> 
>>> 
>>> --
>>> Federico D'Ambrosio
>> 
>> 
>> 
>> -- 
>> Federico D'Ambrosio
>> 
>> 
>> 
>> -- 
>> Federico D'Ambrosio
> 
> 
> 
> 
> -- 
> Federico D'Ambrosio

Reply via email to