Thanks, Zhijiang and Gordon.

I will see the logs to find out more.

On Wed, Mar 18, 2020 at 1:44 PM Zhijiang <> wrote:

> Agree with Gordon's below explanation!
> Besides that, maybe you can also check the job master's log which might
> probably show the specific exception to cause this failure.
> I was thinking whether it is necessary to improve
> ExceptionInChainedOperatorException to also provide the message from the
> wrapped real exception,
> then users can easily get the root cause directly, not only for the
> current message "Could not forward element to next operator".
> Best,
> Zhijiang
> ------------------------------------------------------------------
> From:Tzu-Li (Gordon) Tai <>
> Send Time:2020 Mar. 18 (Wed.) 00:01
> To:aj <>
> Cc:user <>
> Subject:Re: Help me understand this Exception
> Hi,
> The exception stack you posted simply means that the next operator in the
> chain failed to process the output watermark.
> There should be another exception, which would explain why some operator
> was closed / failed and eventually leading to the above exception.
> That would provide more insight to exactly why your job is failing.
> Cheers,
> Gordon
> On Tue, Mar 17, 2020 at 11:27 PM aj <> wrote:
> Hi,
> I am running a streaming job with generating watermark like this :
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks<GenericRecord> {
>     @Override
>     public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
>         long timestamp = (long) record.get("event_ts");
>"timestamp----", timestamp);
>         return timestamp;
>     }
>     @Override
>     public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
>         // simply emit a watermark with every event
>"extractedTimestamp ", extractedTimestamp);
>         return new Watermark(extractedTimestamp);
>     }
> }
> Please help me understand what this exception means:
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
>     at
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(
>     at
> .processElement(
>     at
> .processInput(
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>     at
> .java:301)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>     at org.apache.flink.runtime.taskmanager.Task.doRun(
>     at
>     at
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(
> 727)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(
> 705)
>     at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(
>     at com.bounce.test.SessionProcessor$2.process(
> 137)
>     at com.bounce.test.SessionProcessor$2.process(
> 116)
>     at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
>     at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.emitWindowContents(
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(
>     at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
> .advanceWatermark(
>     at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
> .advanceWatermark(
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .processWatermark(
>     at
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
>     ... 10 more
> --
> Thanks & Regards,
> Anuj Jain
> <>

Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07


Reply via email to