Yun Gao created FLINK-22928: ------------------------------- Summary: Unexpected exception happens in RecordWriter when stopping-with-savepoint Key: FLINK-22928 URL: https://issues.apache.org/jira/browse/FLINK-22928 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.11.2 Reporter: Yun Gao
{code:java} 2021-06-05 10:02:51 java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture .java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java: 1928) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close( StreamOperatorWrapper.java:130) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close( StreamOperatorWrapper.java:134) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close( StreamOperatorWrapper.java:80) at org.apache.flink.streaming.runtime.tasks.OperatorChain .closeOperators(OperatorChain.java:302) at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke( StreamTask.java:576) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:544) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput .emitWatermark(OperatorChain.java:642) at org.apache.flink.streaming.api.operators.CountingOutput .emitWatermark(CountingOutput.java:41) at org.apache.flink.streaming.runtime.operators. TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark( TimestampsAndWatermarksOperator.java:165) at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks .onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69) at org.apache.flink.streaming.runtime.operators. TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java: 125) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper .lambda$closeOperator$5(StreamOperatorWrapper.java:205) at org.apache.flink.streaming.runtime.tasks. StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper .closeOperator(StreamOperatorWrapper.java:203) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) at org.apache.flink.streaming.runtime.tasks. StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java: 78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl .tryYield(MailboxExecutorImpl.java:90) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ... 9 more Caused by: java.lang.RuntimeException at org.apache.flink.streaming.runtime.io.RecordWriterOutput .emitWatermark(RecordWriterOutput.java:123) at org.apache.flink.streaming.runtime.tasks. OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java: 762) at org.apache.flink.streaming.api.operators.CountingOutput .emitWatermark(CountingOutput.java:41) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .processWatermark(AbstractStreamOperator.java:570) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput .emitWatermark(OperatorChain.java:638) ... 21 more Caused by: java.lang.IllegalStateException at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179 ) at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append( BufferBuilder.java:83) at org.apache.flink.runtime.io.network.api.serialization. SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java: 90) at org.apache.flink.runtime.io.network.api.writer.RecordWriter .copyFromSerializerToTargetChannel(RecordWriter.java:136) at org.apache.flink.runtime.io.network.api.writer. ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java: 80) at org.apache.flink.streaming.runtime.io.RecordWriterOutput .emitWatermark(RecordWriterOutput.java:121) ... 25 more {code} The issue seems to happen when stoping a job with stop-with-savepoint. It is reported by used inĀ [the user ML|https://lists.apache.org/thread.html/r1e595ceac4e7c6ac6ec473108cceb35c8ba3032084fc83dc8779af3f%40%3Cuser.flink.apache.org%3E]. -- This message was sent by Atlassian Jira (v8.3.4#803005)