Hi,

The exception stack you posted simply means that the next operator in the
chain failed to process the output watermark.
There should be another exception, which would explain why some operator
was closed / failed and eventually leading to the above exception.
That would provide more insight to exactly why your job is failing.

Cheers,
Gordon

On Tue, Mar 17, 2020 at 11:27 PM aj <ajainje...@gmail.com> wrote:

> Hi,
> I am running a streaming job with generating watermark like this :
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks<GenericRecord> {
>     @Override
>     public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
>         long timestamp = (long) record.get("event_ts");
>         LOGGER.info("timestamp----", timestamp);
>         return timestamp;
>     }
>
>     @Override
>     public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
>         // simply emit a watermark with every event
>         LOGGER.info("extractedTimestamp ", extractedTimestamp);
>         return new Watermark(extractedTimestamp);
>     }
> }
>
> Please help me understand what this exception means:
>
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
>     at org.apache.flink.streaming.runtime.io.
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
> StreamOneInputProcessor.java:216)
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:189)
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processElement(StreamOneInputProcessor.java:169)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:143)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:279)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:301)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:406)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
>     at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(TimestampedCollector.java:51)
>     at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 137)
>     at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 116)
>     at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:50)
>     at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:32)
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.emitWindowContents(WindowOperator.java:549)
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.java:457)
>     at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
> .advanceWatermark(InternalTimerServiceImpl.java:276)
>     at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
> .advanceWatermark(InternalTimeServiceManager.java:128)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .processWatermark(AbstractStreamOperator.java:784)
>     at org.apache.flink.streaming.runtime.io.
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
> StreamOneInputProcessor.java:213)
>     ... 10 more
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Reply via email to