[ 
https://issues.apache.org/jira/browse/FLINK-20781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255564#comment-17255564
 ] 

Jiangjie Qin commented on FLINK-20781:
--------------------------------------

[~xintongsong] Yes, unfortunately all the methods are subject to this problem.

The correct fix here have to be more systematic. The root cause to the problem 
is that the order of the mailbox task  shutdown sequence is not thread safe and 
not atomic. For example, as [~kezhuw] pointed out in FLINK-20389. The chained 
futures are not guaranteed to be executed in a consecutive manner. We will have 
to construct the entire chained futures in one shot and pass the whole chained 
futures to the mailbox.

BTW, personally I don't think this issue is a blocker for release-1.12.1. The 
problem only occurs occasionally when a task exits. When hitting this problem, 
the job will just failover. I am happy to put a band-aid here for the 
{{SourceOperator}}. But it seems we don't have to make such a long pending 
issue of the mailbox task shutdown sequence as a blocker of release 1.12.1.

> UnalignedCheckpointITCase failure caused by NullPointerException
> ----------------------------------------------------------------
>
>                 Key: FLINK-20781
>                 URL: https://issues.apache.org/jira/browse/FLINK-20781
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.12.0, 1.13.0
>            Reporter: Matthias
>            Assignee: Jiangjie Qin
>            Priority: Blocker
>              Labels: test-stability
>             Fix For: 1.12.1
>
>
> {{UnalignedCheckpointITCase}} fails due to {{NullPointerException}} (e.g. in 
> [this 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=11083&view=logs&j=59c257d0-c525-593b-261d-e96a86f1926b&t=b93980e3-753f-5433-6a19-13747adae66a&l=8798]):
> {code:java}
> [ERROR] Tests run: 10, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 
> 152.186 s <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.UnalignedCheckpointITCase
> [ERROR] execute[Parallel cogroup, p = 
> 10](org.apache.flink.test.checkpointing.UnalignedCheckpointITCase)  Time 
> elapsed: 34.869 s  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>       at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
>       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>       at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
>       at akka.dispatch.OnComplete.internal(Future.scala:264)
>       at akka.dispatch.OnComplete.internal(Future.scala:261)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>       at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>       at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>       at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>       at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>       at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>       at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>       at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>       at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>       at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5, 
> backoffTimeMS=100)
>       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:221)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:214)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:205)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:577)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:420)
>       at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>       ... 4 more
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointAborted(SourceOperator.java:299)
>       at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:311)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$12(StreamTask.java:968)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:977)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:128)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>       at java.lang.Thread.run(Thread.java:748) {code}
> This might be caused by work related to FLINK-20492 due to 
> [168124f|https://github.com/apache/flink/commit/168124f99c75e873adc81437c700f85f703e2248#diff-eba14821fb3e96f6f20e3116ceab893c2f18cff605b177dad485aadc43ac4f56R240]
>  setting {{sourceReader}} to {{null}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to