[ 
https://issues.apache.org/jira/browse/FLINK-32348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738852#comment-17738852
 ] 

Jiabao Sun commented on FLINK-32348:
------------------------------------

Hi [~martijnvisser]

I tried to investigate and reproduce the issue, and found that when the 
`AsyncCheckpointRunnable` meets `CancellationException`, the task never stops 
as expected.

I think this problem may relate to 
[FLINK-25902|https://issues.apache.org/jira/browse/FLINK-25902].
The root cause of this problem remains to be further investigated.


{code:sh}
00:30:26,533 [flink-akka.actor.default-dispatcher-10] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
Checkpoint 4 for job 79765d8c304b804a1adbd3677bc39708 failed due to 
org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager received a 
checkpoint request for unknown task 
6585a08f46e2d380ebe0ac7fde3739a7_cbc357ccb763df2852fee8c4fc7d55f2_1_1. Failure 
reason: Task local checkpoint failure.
00:30:26,533 [    Checkpoint Timer] WARN  
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
trigger or complete checkpoint 4 for job 79765d8c304b804a1adbd3677bc39708. (0 
consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager received a 
checkpoint request for unknown task 
6585a08f46e2d380ebe0ac7fde3739a7_cbc357ccb763df2852fee8c4fc7d55f2_1_1. Failure 
reason: Task local checkpoint failure.
        at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:1025)
 ~[flink-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source) ~[?:?]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_372]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
 ~[?:?]
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
 ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
 ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[?:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) ~[?:?]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) ~[?:?]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[?:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
~[?:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
~[?:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
~[?:?]
        at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
        at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
~[?:?]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) ~[?:?]
        at akka.actor.ActorCell.invoke(ActorCell.scala:547) ~[?:?]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]
        at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
~[?:1.8.0_372]
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
~[?:1.8.0_372]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
~[?:1.8.0_372]
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
~[?:1.8.0_372]
00:30:26,554 [AsyncOperations-thread-1] INFO  
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Sink: 
Data stream collect sink (1/1)#1 - asynchronous part of checkpoint 4 could not 
be completed.
java.util.concurrent.CancellationException: null
        at java.util.concurrent.FutureTask.report(FutureTask.java:121) 
~[?:1.8.0_372]
        at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
~[?:1.8.0_372]
        at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
 ~[flink-core-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:60)
 ~[flink-streaming-java-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
 ~[flink-streaming-java-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
 ~[flink-streaming-java-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_372]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_372]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_372]
00:30:26,559 [jobmanager-io-thread-2] WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Received late 
message for now expired checkpoint attempt 4 from task 
6585a08f46e2d380ebe0ac7fde3739a7_cbc357ccb763df2852fee8c4fc7d55f2_0_1 of job 
79765d8c304b804a1adbd3677bc39708 at c5c049a8-08be-4625-9431-5e9d1f75ba01 @ 
localhost (dataPort=41367).
00:30:26,609 [    Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 5 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1686443426609 for job 
79765d8c304b804a1adbd3677bc39708.
00:30:26,899 [jobmanager-io-thread-2] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 5 for job 79765d8c304b804a1adbd3677bc39708 (2327692 bytes, 
checkpointDuration=290 ms, finalizationTime=0 ms).
00:30:26,906 [SourceCoordinator-Source: MongoDB-Source] INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 5 as completed for source Source: MongoDB-Source.
00:30:26,906 [    Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 6 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1686443426906 for job 
79765d8c304b804a1adbd3677bc39708.
00:30:27,170 [jobmanager-io-thread-1] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 6 for job 79765d8c304b804a1adbd3677bc39708 (2327692 bytes, 
checkpointDuration=263 ms, finalizationTime=1 ms).
00:30:27,170 [    Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointRequestDecider [] - checkpoint 
request time in queue: 161
00:30:27,171 [SourceCoordinator-Source: MongoDB-Source] INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 6 as completed for source Source: MongoDB-Source.
00:30:27,171 [    Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 7 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1686443427170 for job 
79765d8c304b804a1adbd3677bc39708.
00:30:27,424 [jobmanager-io-thread-1] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 7 for job 79765d8c304b804a1adbd3677bc39708 (2327692 bytes, 
checkpointDuration=254 ms, finalizationTime=0 ms).
00:30:27,427 [SourceCoordinator-Source: MongoDB-Source] INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 7 as completed for source Source: MongoDB-Source.
{code}



> MongoDB tests are flaky and time out
> ------------------------------------
>
>                 Key: FLINK-32348
>                 URL: https://issues.apache.org/jira/browse/FLINK-32348
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / MongoDB
>            Reporter: Martijn Visser
>            Priority: Critical
>              Labels: test-stability
>
> https://github.com/apache/flink-connector-mongodb/actions/runs/5232649632/jobs/9447519651#step:13:39307



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to