Hi, The exception stack you posted simply means that the next operator in the chain failed to process the output watermark. There should be another exception, which would explain why some operator was closed / failed and eventually leading to the above exception. That would provide more insight to exactly why your job is failing.
Cheers, Gordon On Tue, Mar 17, 2020 at 11:27 PM aj <ajainje...@gmail.com> wrote: > Hi, > I am running a streaming job with generating watermark like this : > > public static class SessionAssigner implements > AssignerWithPunctuatedWatermarks<GenericRecord> { > @Override > public long extractTimestamp(GenericRecord record, long > previousElementTimestamp) { > long timestamp = (long) record.get("event_ts"); > LOGGER.info("timestamp----", timestamp); > return timestamp; > } > > @Override > public Watermark checkAndGetNextWatermark(GenericRecord record, long > extractedTimestamp) { > // simply emit a watermark with every event > LOGGER.info("extractedTimestamp ", extractedTimestamp); > return new Watermark(extractedTimestamp); > } > } > > Please help me understand what this exception means: > > java.lang.RuntimeException: Exception occurred while processing valve > output watermark: > at org.apache.flink.streaming.runtime.io. > StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark( > StreamOneInputProcessor.java:216) > at org.apache.flink.streaming.runtime.streamstatus. > StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels( > StatusWatermarkValve.java:189) > at org.apache.flink.streaming.runtime.streamstatus. > StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processElement(StreamOneInputProcessor.java:169) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:143) > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:279) > at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask > .java:301) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:406) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: > 727) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: > 705) > at org.apache.flink.streaming.api.operators.TimestampedCollector. > collect(TimestampedCollector.java:51) > at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java: > 137) > at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java: > 116) > at org.apache.flink.streaming.runtime.operators.windowing.functions. > InternalIterableProcessWindowFunction.process( > InternalIterableProcessWindowFunction.java:50) > at org.apache.flink.streaming.runtime.operators.windowing.functions. > InternalIterableProcessWindowFunction.process( > InternalIterableProcessWindowFunction.java:32) > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.emitWindowContents(WindowOperator.java:549) > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.onEventTime(WindowOperator.java:457) > at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl > .advanceWatermark(InternalTimerServiceImpl.java:276) > at org.apache.flink.streaming.api.operators.InternalTimeServiceManager > .advanceWatermark(InternalTimeServiceManager.java:128) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .processWatermark(AbstractStreamOperator.java:784) > at org.apache.flink.streaming.runtime.io. > StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark( > StreamOneInputProcessor.java:213) > ... 10 more > > -- > Thanks & Regards, > Anuj Jain > > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> >