[ 
https://issues.apache.org/jira/browse/FLINK-21028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281875#comment-17281875
 ] 

Piotr Nowojski commented on FLINK-21028:
----------------------------------------

[~TheoD], could you post full logs, including those from Job Manager and at 
least from those two Task Managers that:
{quote}
two tasks had something that looks like a racecondition
{quote}
? Frankly I stared at the code and the provided stack trace for a better part 
of last couple of hours and the only potential explanation that I could come up 
with, was if someone had interrupted the task's thread (1) while it was waiting 
for the buffer and this interrupted exception would later be swallowed by 
someone else (2). The problem might be that 
{{RecordWriter#finishBufferBuilder}} leaves the finished buffer without setting 
it to {{null}}, which is later detected as an {{IllegalStateException}}. If 
that's the case, this problem was accidentally fixed in FLINK-19297.

(1) If that was Flink cancelling those two tasks, there must be some reason 
behind this cancellation attempt. Some different third task, would have to have 
failed due to a completely different reason. Alternatively, are you sure that 
neither you, nor anyone else has cancelled the job while it was being stopped? 
Or maybe something completely different, some watchdog script issued SIGINT to 
the Flink process? 

The second part (2), might be actually something somehow expected. We are first 
interrupting the source function thread, and later the task thread. During this 
period maybe {{StreamTask}} managed to start closing itself and it hit this 
illegal state problem?

TL;DR I'm not sure, but my only theory is a combination of something else 
interrupting a task plus a minor bug fixed accidentally in FLINK-19297.

> Streaming application didn't stop properly 
> -------------------------------------------
>
>                 Key: FLINK-21028
>                 URL: https://issues.apache.org/jira/browse/FLINK-21028
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.11.2
>            Reporter: Theo Diefenthal
>            Priority: Major
>
> I have a Flink job running on YARN with a disjoint graph, i.e. a single job 
> contains two independent and isolated pipelines.
> From time to time, I stop the job with a savepoint like so:
> {code:java}
> flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS 
> --yarnapplicationId=${FLINK_YARN_APPID} ${ID}{code}
> A few days ago, this job suddenly didn't stop properly as usual but ran into 
> a possible race condition.
> On the CLI with stop, I received a simple timeout:
> {code:java}
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "f23290bf5fb0ecd49a4455e4a65f2eb6".
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
>  at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>  at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
>  ... 9 more{code}
>  
> The root of the problem however is that on a taskmanager, I received an 
> exception in shutdown, which lead to restarting (a part) of the pipeline and 
> put it back to running state, thus the console command for stopping timed out 
> (as the job was (partially) back in running state). the exception which looks 
> like a race condition for me in the logs is:
> {code:java}
> 2021-01-12T06:15:15.827877+01:00 WARN 
> org.apache.flink.runtime.taskmanager.Task Source: rawdata_source1 -> 
> validation_source1 -> enrich_source1 -> map_json_source1 -> Sink: 
> write_to_kafka_source1) (3/18) (bc68320cf69dd877782417a3298499d6) switched 
> from RUNNING to FAILED.
> java.util.concurrent.ExecutionException: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
>  at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
>  at 
> org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter.onPeriodicEmit(AssignerWithPeriodicWatermarksAdapter.java:54)
>  at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
>  ... 13 more
> Caused by: java.lang.RuntimeException
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
>  ... 25 more
> Caused by: java.lang.IllegalStateException
>  at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>  at 
> org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
>  at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
>  at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
>  ... 29 more{code}
> I already raised a question regarding this bug on the user mailing list with 
> the conclusion to just open a ticket here. Original on user mailing list: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Bugs-in-Streaming-job-stopping-Weird-graceful-stop-restart-for-disjoint-job-graph-td40610.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to