[ https://issues.apache.org/jira/browse/FLINK-21028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17289031#comment-17289031 ]
Piotr Nowojski commented on FLINK-21028: ---------------------------------------- merged commit 19ceee0 into apache:master > Streaming application didn't stop properly > ------------------------------------------- > > Key: FLINK-21028 > URL: https://issues.apache.org/jira/browse/FLINK-21028 > Project: Flink > Issue Type: Bug > Components: Runtime / Task > Affects Versions: 1.11.2, 1.12.2, 1.13.0 > Reporter: Theo Diefenthal > Assignee: Piotr Nowojski > Priority: Critical > Labels: pull-request-available > Fix For: 1.11.4, 1.12.2, 1.13.0 > > > I have a Flink job running on YARN with a disjoint graph, i.e. a single job > contains two independent and isolated pipelines. > From time to time, I stop the job with a savepoint like so: > {code:java} > flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS > --yarnapplicationId=${FLINK_YARN_APPID} ${ID}{code} > A few days ago, this job suddenly didn't stop properly as usual but ran into > a possible race condition. > On the CLI with stop, I received a simple timeout: > {code:java} > org.apache.flink.util.FlinkException: Could not stop with a savepoint job > "f23290bf5fb0ecd49a4455e4a65f2eb6". > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493) > ... 9 more{code} > > The root of the problem however is that on a taskmanager, I received an > exception in shutdown, which lead to restarting (a part) of the pipeline and > put it back to running state, thus the console command for stopping timed out > (as the job was (partially) back in running state). the exception which looks > like a race condition for me in the logs is: > {code:java} > 2021-01-12T06:15:15.827877+01:00 WARN > org.apache.flink.runtime.taskmanager.Task Source: rawdata_source1 -> > validation_source1 -> enrich_source1 -> map_json_source1 -> Sink: > write_to_kafka_source1) (3/18) (bc68320cf69dd877782417a3298499d6) switched > from RUNNING to FAILED. > java.util.concurrent.ExecutionException: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) > at > org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165) > at > org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter.onPeriodicEmit(AssignerWithPeriodicWatermarksAdapter.java:54) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) > ... 13 more > Caused by: java.lang.RuntimeException > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123) > at > org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638) > ... 25 more > Caused by: java.lang.IllegalStateException > at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121) > ... 29 more{code} > I already raised a question regarding this bug on the user mailing list with > the conclusion to just open a ticket here. Original on user mailing list: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Bugs-in-Streaming-job-stopping-Weird-graceful-stop-restart-for-disjoint-job-graph-td40610.html > edit: > In Flink 1.12.x this bug can probably lead to corrupted data stream and all > kinds of deserialisation errors on the downstream task. > Also the same bug can leave any code (regardless if it's Flink's network > stack, user code, or some 3rd party library) that sleeps interruptible in an > invalid state. -- This message was sent by Atlassian Jira (v8.3.4#803005)