Thanks, Zhijiang and Gordon.

I will see the logs to find out more.

On Wed, Mar 18, 2020 at 1:44 PM Zhijiang <wangzhijiang...@aliyun.com> wrote:

> Agree with Gordon's below explanation!
>
> Besides that, maybe you can also check the job master's log which might
> probably show the specific exception to cause this failure.
>
> I was thinking whether it is necessary to improve
> ExceptionInChainedOperatorException to also provide the message from the
> wrapped real exception,
> then users can easily get the root cause directly, not only for the
> current message "Could not forward element to next operator".
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> From:Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> Send Time:2020 Mar. 18 (Wed.) 00:01
> To:aj <ajainje...@gmail.com>
> Cc:user <user@flink.apache.org>
> Subject:Re: Help me understand this Exception
>
> Hi,
>
> The exception stack you posted simply means that the next operator in the
> chain failed to process the output watermark.
> There should be another exception, which would explain why some operator
> was closed / failed and eventually leading to the above exception.
> That would provide more insight to exactly why your job is failing.
>
> Cheers,
> Gordon
>
> On Tue, Mar 17, 2020 at 11:27 PM aj <ajainje...@gmail.com> wrote:
> Hi,
> I am running a streaming job with generating watermark like this :
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks<GenericRecord> {
>     @Override
>     public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
>         long timestamp = (long) record.get("event_ts");
>         LOGGER.info("timestamp----", timestamp);
>         return timestamp;
>     }
>
>     @Override
>     public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
>         // simply emit a watermark with every event
>         LOGGER.info("extractedTimestamp ", extractedTimestamp);
>         return new Watermark(extractedTimestamp);
>     }
> }
>
> Please help me understand what this exception means:
>
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
>     at org.apache.flink.streaming.runtime.io.
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
> StreamOneInputProcessor.java:216)
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:189)
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processElement(StreamOneInputProcessor.java:169)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:143)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:279)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:301)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:406)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
>     at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(TimestampedCollector.java:51)
>     at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 137)
>     at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 116)
>     at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:50)
>     at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:32)
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.emitWindowContents(WindowOperator.java:549)
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.java:457)
>     at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
> .advanceWatermark(InternalTimerServiceImpl.java:276)
>     at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
> .advanceWatermark(InternalTimeServiceManager.java:128)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .processWatermark(AbstractStreamOperator.java:784)
>     at org.apache.flink.streaming.runtime.io.
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
> StreamOneInputProcessor.java:213)
>     ... 10 more
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to