[ https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810745#comment-17810745 ]
Matthias Pohl edited comment on FLINK-34227 at 1/25/24 8:34 AM: ---------------------------------------------------------------- *Call Hierarchy* {code:bash} # "INFO: Connecting to ResourceManager [...]" printed JobMaster.tryConnectToResourceManager() (org.apache.flink.runtime.jobmaster) JobMaster.reconnectToResourceManager(Exception) (org.apache.flink.runtime.jobmaster) JobMaster.disconnectResourceManager(ResourceManagerId, Exception) (org.apache.flink.runtime.jobmaster) # "INFO: Disconnect job manager [...]" NOT printed ResourceManager.closeJobManagerConnection(JobID, ResourceRequirementHandling, Exception) (org.apache.flink.runtime.resourcemanager) ResourceManager.removeJob(JobID, Exception) (org.apache.flink.runtime.resourcemanager) ResourceManager.disconnectJobManager(JobID, JobStatus, Exception) (org.apache.flink.runtime.resourcemanager) # "INFO: Close ResourceManager connection" printed JobMaster.dissolveResourceManagerConnection(EstablishedResourceManagerConnection, Exception) (org.apache.flink.runtime.jobmaster) JobMaster.closeResourceManagerConnection(Exception) (org.apache.flink.runtime.jobmaster) JobMaster.disconnectTaskManagerResourceManagerConnections(Exception) (org.apache.flink.runtime.jobmaster) JobMaster.stopJobExecution(Exception) (org.apache.flink.runtime.jobmaster) JobMaster.onStop() (org.apache.flink.runtime.jobmaster) RpcEndpoint.internalCallOnStop() (org.apache.flink.runtime.rpc) terminate(PekkoRpcActor<?>, ClassLoader) in StartedState in PekkoRpcActor (org.apache.flink.runtime.rpc.pekko) PekkoRpcActor.handleControlMessage(ControlMessages) (org.apache.flink.runtime.rpc.pekko) PekkoRpcActor.createReceive() (org.apache.flink.runtime.rpc.pekko) {code} *Findings* Reasons that could prevent the disconnect log message (and as a consequence traversing the call tree further) * {{ResourceManager#jobManagerRegistrations}} does not include job anymore ([ResourceManager:1082|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L1082]) ** ResourceManager#jobManagerRegistrations is updated in three different locations: *** When registering a JobMaster with the ResourceManager ([ResourceManager#registerJobMasterInternal|#registerJobMasterInternal|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L930]) **** This would be revealed through a log message [INFO: Registering job manager|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L388] *** When clearing the internal state due to a [ResourceManager#stopResourceManagerServices|#stopResourceManagerServices|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L350] **** This only happens if the ResourceManager gets restarted or stopped *** As part of the JobManagerConnection closing ([ResourceManager#closeJobManagerConnection|#closeJobManagerConnection|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L1080]) **** This is the case we're in and which should have removed the job actually but didn't Reasons for why the connect happened again: * [handleResourceManagerConnectionLoss|https://github.com/apache/flink/blob/7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1523] triggers a reconnect and is triggered by the following two callers ** [JobMaster#ResourceManagerHeartbeatListener#handleResourceConnectionLoss|https://github.com/apache/flink/blob/7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1514] which would produce a log message "INFO: The heartbeat of ResourceManager ..." ** [JobMaster#ResourceManagerHeartbeatListener#notifyTargetUnreachable|https://github.com/apache/flink/blob/7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1535] which would produce a log message "INFO: ResourceManager with id ? is not longer reachable." * [JobMaster#notifyOfNewResourceManagerLeader|https://github.com/apache/flink/blob/7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1145] ** Leader change for the ResourceManager cannot be the reason: *** no logs available related to leader election *** the ResourceManager session ID {{891f46bbb398d49ad91e1dde0bee410c}} stays the same between both test runs was (Author: mapohl): *Call Hierarchy* {code:bash} # "INFO: Connecting to ResourceManager [...]" printed JobMaster.tryConnectToResourceManager() (org.apache.flink.runtime.jobmaster) JobMaster.reconnectToResourceManager(Exception) (org.apache.flink.runtime.jobmaster) JobMaster.disconnectResourceManager(ResourceManagerId, Exception) (org.apache.flink.runtime.jobmaster) # "INFO: Disconnect job manager [...]" NOT printed ResourceManager.closeJobManagerConnection(JobID, ResourceRequirementHandling, Exception) (org.apache.flink.runtime.resourcemanager) ResourceManager.removeJob(JobID, Exception) (org.apache.flink.runtime.resourcemanager) ResourceManager.disconnectJobManager(JobID, JobStatus, Exception) (org.apache.flink.runtime.resourcemanager) # "INFO: Close ResourceManager connection" printed JobMaster.dissolveResourceManagerConnection(EstablishedResourceManagerConnection, Exception) (org.apache.flink.runtime.jobmaster) JobMaster.closeResourceManagerConnection(Exception) (org.apache.flink.runtime.jobmaster) JobMaster.reconnectToResourceManager(Exception) (org.apache.flink.runtime.jobmaster) JobMaster.disconnectTaskManagerResourceManagerConnections(Exception) (org.apache.flink.runtime.jobmaster) JobMaster.stopJobExecution(Exception) (org.apache.flink.runtime.jobmaster) JobMaster.onStop() (org.apache.flink.runtime.jobmaster) RpcEndpoint.internalCallOnStop() (org.apache.flink.runtime.rpc) terminate(PekkoRpcActor<?>, ClassLoader) in StartedState in PekkoRpcActor (org.apache.flink.runtime.rpc.pekko) PekkoRpcActor.handleControlMessage(ControlMessages) (org.apache.flink.runtime.rpc.pekko) PekkoRpcActor.createReceive() (org.apache.flink.runtime.rpc.pekko) {code} *Findings* Reasons that could prevent the disconnect log message (and as a consequence traversing the call tree further) * {{ResourceManager#jobManagerRegistrations}} does not include job anymore ([ResourceManager:1082|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L1082]) ** ResourceManager#jobManagerRegistrations is updated in three different locations: *** When registering a JobMaster with the ResourceManager ([ResourceManager#registerJobMasterInternal|#registerJobMasterInternal|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L930]) **** This would be revealed through a log message [INFO: Registering job manager|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L388] *** When clearing the internal state due to a [ResourceManager#stopResourceManagerServices|#stopResourceManagerServices|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L350] **** This only happens if the ResourceManager gets restarted or stopped *** As part of the JobManagerConnection closing ([ResourceManager#closeJobManagerConnection|#closeJobManagerConnection|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L1080]) **** This is the case we're in and which should have removed the job actually but didn't Reasons for why the connect happened again: * [handleResourceManagerConnectionLoss|https://github.com/apache/flink/blob/7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1523] triggers a reconnect and is triggered by the following two callers ** [JobMaster#ResourceManagerHeartbeatListener#handleResourceConnectionLoss|https://github.com/apache/flink/blob/7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1514] which would produce a log message "INFO: The heartbeat of ResourceManager ..." ** [JobMaster#ResourceManagerHeartbeatListener#notifyTargetUnreachable|https://github.com/apache/flink/blob/7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1535] which would produce a log message "INFO: ResourceManager with id ? is not longer reachable." * [JobMaster#notifyOfNewResourceManagerLeader|https://github.com/apache/flink/blob/7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1145] ** Leader change for the ResourceManager cannot be the reason: *** no logs available related to leader election *** the ResourceManager session ID {{891f46bbb398d49ad91e1dde0bee410c}} stays the same between both test runs > Job doesn't disconnect from ResourceManager > ------------------------------------------- > > Key: FLINK-34227 > URL: https://issues.apache.org/jira/browse/FLINK-34227 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.18.1 > Reporter: Matthias Pohl > Assignee: Matthias Pohl > Priority: Critical > Labels: github-actions, test-stability > Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, > FLINK-34227.log > > > https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557 > {code} > [...] > "main" #1 prio=5 os_prio=0 tid=0x00007fcccc4b7000 nid=0x24ec0 waiting on > condition [0x00007fccce1eb000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000bdd52618> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876) > at > org.apache.flink.table.planner.runtime.stream.sql.WindowDistinctAggregateITCase.testHopWindow_Cube(WindowDistinctAggregateITCase.scala:550) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)