Theo Diefenthal created FLINK-21028:
---------------------------------------

             Summary: Streaming application didn't stop properly 
                 Key: FLINK-21028
                 URL: https://issues.apache.org/jira/browse/FLINK-21028
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.11.2
            Reporter: Theo Diefenthal


I have a Flink job running on YARN with a disjoint graph, i.e. a single job 
contains two independent and isolated pipelines.

>From time to time, I stop the job with a savepoint like so:
{code:java}
flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS 
--yarnapplicationId=${FLINK_YARN_APPID} ${ID}{code}
A few days ago, this job suddenly didn't stop properly as usual but ran into a 
possible race condition.

On the CLI with stop, I received a simple timeout:
{code:java}
org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"f23290bf5fb0ecd49a4455e4a65f2eb6".
 at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
 at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
 at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
 at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.TimeoutException
 at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
 ... 9 more{code}
 

The root of the problem however is that on a taskmanager, I received an 
exception in shutdown, which lead to restarting (a part) of the pipeline and 
put it back to running state, thus the console command for stopping timed out 
(as the job was (partially) back in running state). the exception which looks 
like a race condition for me in the logs is:
{code:java}
2021-01-12T06:15:15.827877+01:00 WARN org.apache.flink.runtime.taskmanager.Task 
Source: rawdata_source1 -> validation_source1 -> enrich_source1 -> 
map_json_source1 -> Sink: write_to_kafka_source1) (3/18) 
(bc68320cf69dd877782417a3298499d6) switched from RUNNING to FAILED.
java.util.concurrent.ExecutionException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:745)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
 at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
 at 
org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter.onPeriodicEmit(AssignerWithPeriodicWatermarksAdapter.java:54)
 at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
 ... 13 more
Caused by: java.lang.RuntimeException
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
 at 
org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
 ... 25 more
Caused by: java.lang.IllegalStateException
 at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
 at 
org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
 ... 29 more{code}
I already raised a question regarding this bug on the user mailing list with 
the conclusion to just open a ticket here. Original on user mailing list: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Bugs-in-Streaming-job-stopping-Weird-graceful-stop-restart-for-disjoint-job-graph-td40610.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to