[ 
https://issues.apache.org/jira/browse/FLINK-32411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738344#comment-17738344
 ] 

Rui Fan commented on FLINK-32411:
---------------------------------

Merged

<master: 1.18> 876b1ec262f6c442e4bf87c9a5f83811e90e2805

1.17: a828cd275b55c566ede4ba9878a9c1d2b5997d6e

1.16: 9fc79ac2c3a96e7edb2b911bdc7b6ceaf0ea731f

> SourceCoordinator thread leaks when job recovers from checkpoint
> ----------------------------------------------------------------
>
>                 Key: FLINK-32411
>                 URL: https://issues.apache.org/jira/browse/FLINK-32411
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.16.2, 1.17.1
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2023-06-22-11-12-35-747.png
>
>
> SourceCoordinator thread leaks when job recovers from checkpoint, from the 
> following figure, we can see:
>  * 2 SourceCoordinator thread for slow SlowNumberSequenceSource
>  * 2 SourceCoordinator thread for slow FastNumberSequenceSource 
> !image-2023-06-22-11-12-35-747.png|width=889,height=225!
> h1. Root cause:
>  # When initialize the ExecutionJobVertex of source, 
> RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code 
> link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60]
>  # When job recovers from checkpoint,  
> [RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120]
>  will close the old coordinator, and create a new coordinator. 
>  # The 
> [SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271]
>  just close the SourceCoordinatorContext after coordinator is started, so the 
> SourceCoordinatorContext of old coordinator won't be closed.
>  # The SourceCoordinatorContext create some threads in its 
> [constructor|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L118],
>  so it should be closed even if the SourceCoordinator isn't started.
>  
> The call stack about creating SourceCoordinator:
> {code:java}
> // Create the first SourceCoordinator
> "jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:142)
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
>       - locked <0x1f02> (a 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:60)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:43)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196)
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534)
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850)
>       at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207)
>       at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:366)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:140)
>       at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156)
>       at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:378)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:355)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory$$Lambda$751.371794887.get(Unknown
>  Source:-1)
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>       at 
> org.apache.flink.util.function.FunctionUtils$$Lambda$752.1103993612.get(Unknown
>  Source:-1)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1590)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:-1)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> // Create the second SourceCoordinator when recovers from checkpoint
> "Thread-7@8239" daemon prio=5 tid=0x4f nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:142)
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
>       - locked <0x2063> (a 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:150)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$$Lambda$900.2091837632.accept(Unknown
>  Source:-1)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>       at 
> org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)
>       at 
> org.apache.flink.runtime.operators.coordination.ComponentClosingUtils$$Lambda$895.1145152008.run(Unknown
>  Source:-1)
>       at java.lang.Thread.run(Thread.java:748){code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to