Agree with Gordon's below explanation!

Besides that, maybe you can also check the job master's log which might 
probably show the specific exception to cause this failure.

I was thinking whether it is necessary to improve 
ExceptionInChainedOperatorException to also provide the message from the 
wrapped real exception,
then users can easily get the root cause directly, not only for the current 
message "Could not forward element to next operator".

Best,
Zhijiang


------------------------------------------------------------------
From:Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Send Time:2020 Mar. 18 (Wed.) 00:01
To:aj <ajainje...@gmail.com>
Cc:user <user@flink.apache.org>
Subject:Re: Help me understand this Exception

Hi,

The exception stack you posted simply means that the next operator in the chain 
failed to process the output watermark.
There should be another exception, which would explain why some operator was 
closed / failed and eventually leading to the above exception.
That would provide more insight to exactly why your job is failing.

Cheers,
Gordon
On Tue, Mar 17, 2020 at 11:27 PM aj <ajainje...@gmail.com> wrote:

Hi,
I am running a streaming job with generating watermark like this :

public static class SessionAssigner implements 
AssignerWithPunctuatedWatermarks<GenericRecord> {
    @Override
    public long extractTimestamp(GenericRecord record, long 
previousElementTimestamp) {
        long timestamp = (long) record.get("event_ts");
        LOGGER.info("timestamp----", timestamp);
        return timestamp;
    }

    @Override
    public Watermark checkAndGetNextWatermark(GenericRecord record, long 
extractedTimestamp) {
        // simply emit a watermark with every event
        LOGGER.info("extractedTimestamp ", extractedTimestamp);
        return new Watermark(extractedTimestamp);
    }
}
Please help me understand what this exception means:

java.lang.RuntimeException: Exception occurred while processing valve output 
watermark: 
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
    at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
    at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
    at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
    at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
    ... 10 more


-- 
Thanks & Regards,
 Anuj Jain





Reply via email to