Hi,

I’m looking into this. Could you let us know the Flink version in which the 
exceptions occurred?

Cheers,
Gordon

On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio 
(federico.dambro...@smartlab.ws) wrote:

Hi, I'm coming across these Exceptions while running a pretty simple flink job.
First one:
java.lang.RuntimeException: Exception occurred while processing valve output 
watermark:  
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException

The second one:
java.io.IOException: Exception while applying ReduceFunction in reducing state
        at 
org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


Since it looks like something is wrong in Watermark processing, in my case 
Watermarks are generated in my KafkaSource:

val stream = env.addSource(
  new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), 
consumerConfig)
    .setStartFromLatest()
    .assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
        def extractTimestamp(element: AirTrafficEvent): Long =
          element.instantValues.time.getMillis
      })
)
These exceptions aren't really that informative per se and, from what I see, 
the task triggering these exceptions is the following operator:

val events = keyedStreamByID
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("timestamp").name("latest_time").uid("latest_time")

What could be the problem here in your opinion? It's not emitting watermarks 
correctly? I'm not even how I could reproduce this exceptions, since it looks 
like they happen pretty much randomly.

Thank you all,
Federico D'Ambrosio

Reply via email to