[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581063#comment-16581063 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann closed pull request #6451: [FLINK-9936] Resource manager connect to mesos after leadership granted. . URL: https://github.com/apache/flink/pull/6451 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index e24214d28c1..b2606e46fbc 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -221,10 +221,14 @@ protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriv // /** -* Starts the Mesos-specifics. +* Do nothing and all work has been moved to on leadership granted callback. */ @Override protected void initialize() throws ResourceManagerException { + } + + @Override + protected void onLeaderShipGranted() throws Exception { // create and start the worker store try { this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig, getRpcService().getExecutor()); @@ -283,7 +287,14 @@ protected void initialize() throws ResourceManagerException { connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); schedulerDriver.start(); - LOG.info("Mesos resource manager initialized."); + LOG.info("Mesos resource manager started."); + } + + @Override + protected void onLeaderShipRevoked() throws Exception { + workerStore.stop(false); + schedulerDriver.stop(true); + disconnected(new Disconnected()); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index a992632b666..081da27424d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -899,6 +899,12 @@ public void grantLeadership(final UUID newLeaderSessionID) { setFencingToken(newResourceManagerId); + try { + onLeaderShipGranted(); + } catch (Exception e) { + onFatalError(e); + } + slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); getRpcService().execute( @@ -919,6 +925,12 @@ public void revokeLeadership() { clearState(); + try { + onLeaderShipRevoked(); + } catch (Exception e) { + onFatalError(e); + } + setFencingToken(null); slotManager.suspend(); @@ -946,6 +958,18 @@ public void handleError(final Exception exception) { */ protected abstract void initialize() throws ResourceManagerException; + /** +* Called when leadership is granted. +* @throws Exception which occurs during granting leadership and causes the resource manager to fail. +*/ + protected void onLeaderShipGranted() throws Exception {} + + /** +* Called when leadership is revoked. +* @throws Exception which occurs during revoking leadership and causes the resource manager to fail. +*/ + protected void onLeaderShipRevoked() throws Exception {} + /** * The framework specific code to deregister the application. This should report the * application's final status and shut down the resource manager cleanly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581062#comment-16581062 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on issue #6451: [FLINK-9936] Resource manager connect to mesos after leadership granted. . URL: https://github.com/apache/flink/pull/6451#issuecomment-413195708 Subsumed by #6464 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568272#comment-16568272 ] ASF GitHub Bot commented on FLINK-9936: --- asfgit closed pull request #6464: [FLINK-9936][mesos] Mesos resource manager unable to connect to master after failover URL: https://github.com/apache/flink/pull/6464 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index e24214d28c1..bc281348fa4 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -84,6 +84,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import scala.Option; @@ -138,6 +139,8 @@ final Map workersInLaunch; final Map workersBeingReturned; + private MesosConfiguration initializedMesosConfig; + public MesosResourceManager( // base class RpcService rpcService, @@ -220,9 +223,6 @@ protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriv // Resource Manager overrides // - /** -* Starts the Mesos-specifics. -*/ @Override protected void initialize() throws ResourceManagerException { // create and start the worker store @@ -233,9 +233,7 @@ protected void initialize() throws ResourceManagerException { throw new ResourceManagerException("Unable to initialize the worker store.", e); } - // register with Mesos - // TODO : defer connection until RM acquires leadership - + // Prepare to register with Mesos Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo() .clone() .setCheckpoint(true); @@ -251,49 +249,86 @@ protected void initialize() throws ResourceManagerException { throw new ResourceManagerException("Unable to recover the framework ID.", e); } - MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo); + initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo); MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig); + + this.selfActor = createSelfActor(); + + // configure the artifact server to serve the TM container artifacts + try { + LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec); + } + catch (IOException e) { + throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e); + } + } + + @Override + protected CompletableFuture prepareLeadershipAsync() { + Preconditions.checkState(initializedMesosConfig != null); + schedulerDriver = initializedMesosConfig.createDriver( new MesosResourceManagerSchedulerCallback(), false); // create supporting actors - selfActor = createSelfActor(); connectionMonitor = createConnectionMonitor(); launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor); reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver); taskMonitor = createTaskMonitor(schedulerDriver); - // recover state - try { - recoverWorkers(); - } catch (Exception e) { - throw new ResourceManagerException("Unable to recover Mesos worker state.", e); - } + return getWorkersAsync().thenApplyAsync((tasksFromPreviousAttempts) -> { + // recover state + recoverWorkers(tasksFromPreviousAttempts); - // configure the artifact server to serve the TM container artifacts - try { -
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568133#comment-16568133 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] Mesos resource manager unable to connect to master after failover URL: https://github.com/apache/flink/pull/6464#discussion_r207522805 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -892,30 +896,48 @@ protected void onFatalError(Throwable t) { */ @Override public void grantLeadership(final UUID newLeaderSessionID) { - runAsyncWithoutFencing( - () -> { - final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID); - - log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId); + final CompletableFuture acceptLeadershipFuture = CompletableFuture.supplyAsync( + () -> tryAcceptLeadership(newLeaderSessionID), + getUnfencedMainThreadExecutor()).thenCompose(Function.identity()); + + final CompletableFuture confirmationFuture = acceptLeadershipFuture.thenAcceptAsync( + (acceptLeadership) -> { + if (acceptLeadership) { + // confirming the leader session ID might be blocking, + leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); + } + }, + getRpcService().getExecutor()); - // clear the state if we've been the leader before - if (getFencingToken() != null) { - clearStateInternal(); + confirmationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + onFatalError(ExceptionUtils.stripCompletionException(throwable)); } + }); + } - setFencingToken(newResourceManagerId); + private CompletableFuture tryAcceptLeadership(final UUID newLeaderSessionID) { + if (leaderElectionService.hasLeadership(newLeaderSessionID)) { + final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID); - slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); + log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId); - prepareLeadershipAsync() - .thenRunAsync(() -> - // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID), getRpcService().getExecutor()) - .exceptionally(t -> { - onFatalError(t); - return null; - }); - }); + // clear the state if we've been the leader before + if (getFencingToken() != null) { + clearStateInternal(); + } + + setFencingToken(newResourceManagerId); + + slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); + + return clearStateFuture Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels:
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568132#comment-16568132 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on issue #6464: [FLINK-9936][mesos] Mesos resource manager unable to connect to master after failover URL: https://github.com/apache/flink/pull/6464#issuecomment-410232380 @liurenjie1024 Do you also want to have final look? The fix works in my cluster tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568002#comment-16568002 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] Mesos resource manager unable to connect to master after failover URL: https://github.com/apache/flink/pull/6464#discussion_r207491799 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -892,30 +896,48 @@ protected void onFatalError(Throwable t) { */ @Override public void grantLeadership(final UUID newLeaderSessionID) { - runAsyncWithoutFencing( - () -> { - final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID); - - log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId); + final CompletableFuture acceptLeadershipFuture = CompletableFuture.supplyAsync( + () -> tryAcceptLeadership(newLeaderSessionID), + getUnfencedMainThreadExecutor()).thenCompose(Function.identity()); + + final CompletableFuture confirmationFuture = acceptLeadershipFuture.thenAcceptAsync( + (acceptLeadership) -> { + if (acceptLeadership) { + // confirming the leader session ID might be blocking, + leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); + } + }, + getRpcService().getExecutor()); - // clear the state if we've been the leader before - if (getFencingToken() != null) { - clearStateInternal(); + confirmationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + onFatalError(ExceptionUtils.stripCompletionException(throwable)); } + }); + } - setFencingToken(newResourceManagerId); + private CompletableFuture tryAcceptLeadership(final UUID newLeaderSessionID) { + if (leaderElectionService.hasLeadership(newLeaderSessionID)) { + final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID); - slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); + log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId); - prepareLeadershipAsync() - .thenRunAsync(() -> - // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID), getRpcService().getExecutor()) - .exceptionally(t -> { - onFatalError(t); - return null; - }); - }); + // clear the state if we've been the leader before + if (getFencingToken() != null) { + clearStateInternal(); + } + + setFencingToken(newResourceManagerId); + + slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); + + return clearStateFuture Review comment: Shouldn't we wait for the completion of the `clearStateFuture` before setting the new fencing token and starting components? Otherwise, these newly started components might interact with the ones being shut down. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type:
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567900#comment-16567900 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207464729 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -391,13 +391,7 @@ private void recoverWorkers(final List tasksFromPreviou @Override public CompletableFuture postStop() { - final CompletableFuture supportActorsStopFuture = stopSupportingActorsAsync(); - - final CompletableFuture terminationFuture = super.postStop(); - - return supportActorsStopFuture.thenCombine( - terminationFuture, - (Void voidA, Void voidB) -> null); + return super.postStop().thenCompose((ignored) -> stopSupportingActorsAsync()); Review comment: 臘♂️ you are right This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567899#comment-16567899 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207464729 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -391,13 +391,7 @@ private void recoverWorkers(final List tasksFromPreviou @Override public CompletableFuture postStop() { - final CompletableFuture supportActorsStopFuture = stopSupportingActorsAsync(); - - final CompletableFuture terminationFuture = super.postStop(); - - return supportActorsStopFuture.thenCombine( - terminationFuture, - (Void voidA, Void voidB) -> null); + return super.postStop().thenCompose((ignored) -> stopSupportingActorsAsync()); Review comment: m( This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567897#comment-16567897 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207463934 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -894,17 +900,21 @@ public void grantLeadership(final UUID newLeaderSessionID) { // clear the state if we've been the leader before if (getFencingToken() != null) { - clearState(); + clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); - getRpcService().execute( - () -> + prepareLeadershipAsync() + .exceptionally(t -> { + onFatalError(t); + return null; + }) + .thenRunAsync(() -> // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID)); + leaderElectionService.confirmLeaderSessionID(newLeaderSessionID), getRpcService().getExecutor()); Review comment: The check would help to guard against a concurrent revoke leadership which is triggered just before the aynchronous grant leadership call is executed. That way we would not unnecessarily initialize internal components which would be stopped by the revoke leadership async callback. At the moment this is not very likely to happen but if we make `clearState` asynchronous and wait in the `grantLeadership` method to complete the cleanup before calling the leadership callback, it gets more and more likely. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567889#comment-16567889 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207463112 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -734,8 +734,14 @@ private void clearState() { } catch (Exception e) { onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); } + clearState(); } + /** +* Callback to clear state on leadership revocation. +*/ + protected void clearState() {} Review comment: True, I would like this a bit more, because then we don't duplicate this logic over multiple different `ResourceManager` implementations if needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567879#comment-16567879 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207461579 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -278,22 +267,76 @@ protected void initialize() throws ResourceManagerException { catch (IOException e) { throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e); } + } - // begin scheduling - connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); - schedulerDriver.start(); + @Override + protected CompletableFuture prepareLeadershipAsync() { + Preconditions.checkState(initializedMesosConfig != null); + + return clearStateFuture + .thenRunAsync(() -> { + schedulerDriver = initializedMesosConfig.createDriver( + new MesosResourceManagerSchedulerCallback(), + false); + + // create supporting actors + connectionMonitor = createConnectionMonitor(); + launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor); + reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver); + taskMonitor = createTaskMonitor(schedulerDriver); + }, getMainThreadExecutor()) + .thenCombineAsync(getWorkersAsync(), (ignored, tasksFromPreviousAttempts) -> { + // recover state + recoverWorkers(tasksFromPreviousAttempts); + + // begin scheduling + connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); + schedulerDriver.start(); + + LOG.info("Mesos resource manager started."); + return null; + }, getMainThreadExecutor()); + } - LOG.info("Mesos resource manager initialized."); + @Override + protected void clearState() { + schedulerDriver.stop(true); + + clearStateFuture = stopSupportingActorsAsync().thenRunAsync(() -> { + workersInNew.clear(); + workersInLaunch.clear(); + workersBeingReturned.clear(); Review comment: That should be safe to do. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567885#comment-16567885 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207462086 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -391,13 +391,7 @@ private void recoverWorkers(final List tasksFromPreviou @Override public CompletableFuture postStop() { - final CompletableFuture supportActorsStopFuture = stopSupportingActorsAsync(); - - final CompletableFuture terminationFuture = super.postStop(); - - return supportActorsStopFuture.thenCombine( - terminationFuture, - (Void voidA, Void voidB) -> null); + return super.postStop().thenCompose((ignored) -> stopSupportingActorsAsync()); Review comment: I think it should be the other way around. First stop sub class resources and then parent class resources. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567875#comment-16567875 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207461551 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception { CompletableFuture stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); reconciliationCoordinator = null; Review comment: You are right. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567880#comment-16567880 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207462105 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -734,8 +734,14 @@ private void clearState() { } catch (Exception e) { onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); } + clearState(); } + /** +* Callback to clear state on leadership revocation. +*/ + protected void clearState() {} Review comment: We could also make the interface asynchronous and let the `ResourceManager` handle the waiting in grant leadership. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567874#comment-16567874 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207461541 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception { CompletableFuture stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); reconciliationCoordinator = null; - CompletableFuture stopFuture = CompletableFuture.allOf( + return CompletableFuture.allOf( stopTaskMonitorFuture, stopConnectionMonitorFuture, stopLaunchCoordinatorFuture, stopReconciliationCoordinatorFuture); + } + + @Override + public CompletableFuture postStop() { + final CompletableFuture supportActorsStopFuture = stopSupportingActorsAsync(); final CompletableFuture terminationFuture = super.postStop(); Review comment: You are right. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567873#comment-16567873 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207461514 ## File path: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ## @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception { resourceManager.taskRouter.expectMsgClass(Disconnected.class); }}; } + + @Test + public void testClearStateRevokeLeadership() throws Exception { + new Context() {{ + MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList()); + + startResourceManager(); + rmServices.rmLeaderElectionService.notLeader(); + rmServices.grantLeadership(); + + //resourceManager.stateCleared.await(5, TimeUnit.SECONDS); Review comment: Yes This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567876#comment-16567876 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207461579 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -278,22 +267,76 @@ protected void initialize() throws ResourceManagerException { catch (IOException e) { throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e); } + } - // begin scheduling - connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); - schedulerDriver.start(); + @Override + protected CompletableFuture prepareLeadershipAsync() { + Preconditions.checkState(initializedMesosConfig != null); + + return clearStateFuture + .thenRunAsync(() -> { + schedulerDriver = initializedMesosConfig.createDriver( + new MesosResourceManagerSchedulerCallback(), + false); + + // create supporting actors + connectionMonitor = createConnectionMonitor(); + launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor); + reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver); + taskMonitor = createTaskMonitor(schedulerDriver); + }, getMainThreadExecutor()) + .thenCombineAsync(getWorkersAsync(), (ignored, tasksFromPreviousAttempts) -> { + // recover state + recoverWorkers(tasksFromPreviousAttempts); + + // begin scheduling + connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); + schedulerDriver.start(); + + LOG.info("Mesos resource manager started."); + return null; + }, getMainThreadExecutor()); + } - LOG.info("Mesos resource manager initialized."); + @Override + protected void clearState() { + schedulerDriver.stop(true); + + clearStateFuture = stopSupportingActorsAsync().thenRunAsync(() -> { + workersInNew.clear(); + workersInLaunch.clear(); + workersBeingReturned.clear(); Review comment: That should be safe to do. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567872#comment-16567872 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207461505 ## File path: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ## @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception { resourceManager.taskRouter.expectMsgClass(Disconnected.class); }}; } + + @Test + public void testClearStateRevokeLeadership() throws Exception { + new Context() {{ + MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList()); + + startResourceManager(); + rmServices.rmLeaderElectionService.notLeader(); + rmServices.grantLeadership(); + + //resourceManager.stateCleared.await(5, TimeUnit.SECONDS); + assertThat(resourceManager.workersInLaunch.size(), equalTo(0)); + verify(rmServices.schedulerDriver).stop(true); Review comment: Yes This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567871#comment-16567871 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207461474 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -894,17 +900,21 @@ public void grantLeadership(final UUID newLeaderSessionID) { // clear the state if we've been the leader before if (getFencingToken() != null) { - clearState(); + clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); - getRpcService().execute( - () -> + prepareLeadershipAsync() + .exceptionally(t -> { + onFatalError(t); + return null; + }) Review comment: You are right. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567850#comment-16567850 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207456451 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -894,17 +900,21 @@ public void grantLeadership(final UUID newLeaderSessionID) { // clear the state if we've been the leader before if (getFencingToken() != null) { - clearState(); + clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); - getRpcService().execute( - () -> + prepareLeadershipAsync() + .exceptionally(t -> { + onFatalError(t); + return null; + }) + .thenRunAsync(() -> // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID)); + leaderElectionService.confirmLeaderSessionID(newLeaderSessionID), getRpcService().getExecutor()); Review comment: I don't understand why it's needed to check for leadership in `tryAcceptLeadership`. The zk `LeaderElectionService` already does that: ``` if (leaderLatch.hasLeadership()) { // check if this is an old confirmation call synchronized (lock) { if (running) { if (leaderSessionID.equals(this.issuedLeaderSessionID)) { confirmedLeaderSessionID = leaderSessionID; writeLeaderInformation(confirmedLeaderSessionID); } } else { LOG.debug("Ignoring the leader session Id {} confirmation, since the " + "ZooKeeperLeaderElectionService has already been stopped.", leaderSessionID); } } } else { LOG.warn("The leader session ID {} was confirmed even though the " + "corresponding JobManager was not elected as the leader.", leaderSessionID); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567113#comment-16567113 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207306976 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception { CompletableFuture stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); reconciliationCoordinator = null; Review comment: I think we need to check whether the supporting actors are not null because we initialize them only after the `clearStateFuture` has been completed. If this takes a bit and someone revokes our leadership in the meantime, `clearState` will be called before the support actors have been created. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567088#comment-16567088 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207302752 ## File path: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ## @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception { resourceManager.taskRouter.expectMsgClass(Disconnected.class); }}; } + + @Test + public void testClearStateRevokeLeadership() throws Exception { + new Context() {{ + MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList()); + + startResourceManager(); + rmServices.rmLeaderElectionService.notLeader(); + rmServices.grantLeadership(); + + //resourceManager.stateCleared.await(5, TimeUnit.SECONDS); + assertThat(resourceManager.workersInLaunch.size(), equalTo(0)); + verify(rmServices.schedulerDriver).stop(true); Review comment: Is the test already complete? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567084#comment-16567084 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207294014 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception { CompletableFuture stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); reconciliationCoordinator = null; - CompletableFuture stopFuture = CompletableFuture.allOf( + return CompletableFuture.allOf( stopTaskMonitorFuture, stopConnectionMonitorFuture, stopLaunchCoordinatorFuture, stopReconciliationCoordinatorFuture); + } + + @Override + public CompletableFuture postStop() { + final CompletableFuture supportActorsStopFuture = stopSupportingActorsAsync(); final CompletableFuture terminationFuture = super.postStop(); Review comment: I think we should call `super.postStop` only after `supportActorsStopFuture` has been completed. Otherwise we might risk that the parent class shuts some resources down which are used by the support actors. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567082#comment-16567082 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207302992 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -894,17 +900,21 @@ public void grantLeadership(final UUID newLeaderSessionID) { // clear the state if we've been the leader before if (getFencingToken() != null) { - clearState(); + clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); - getRpcService().execute( - () -> + prepareLeadershipAsync() + .exceptionally(t -> { + onFatalError(t); + return null; + }) + .thenRunAsync(() -> // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID)); + leaderElectionService.confirmLeaderSessionID(newLeaderSessionID), getRpcService().getExecutor()); Review comment: Maybe we could refactor the `grantLeadership` the following way: ``` /** * Callback method when current resourceManager is granted leadership. * * @param newLeaderSessionID unique leadershipID */ @Override public void grantLeadership(final UUID newLeaderSessionID) { final CompletableFuture acceptLeadershipFuture = CompletableFuture.supplyAsync( () -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor()).thenCompose(Function.identity()); final CompletableFuture confirmationFuture = acceptLeadershipFuture.thenAcceptAsync( (Boolean acceptLeadership) -> { if (acceptLeadership) { // confirming the leader session ID might be blocking, leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); } }, getRpcService().getExecutor()); confirmationFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null) { onFatalError(ExceptionUtils.stripCompletionException(throwable)); } }); } private CompletableFuture tryAcceptLeadership(UUID newLeaderSessionID) { if (leaderElectionService.hasLeadership(newLeaderSessionID)) { final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID); log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId); // clear the state if we've been the leader before if (getFencingToken() != null) { clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); return prepareLeadershipAsync().thenApply(ignored -> true); } else { return CompletableFuture.completedFuture(false); } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL:
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567085#comment-16567085 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207294205 ## File path: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ## @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception { resourceManager.taskRouter.expectMsgClass(Disconnected.class); }}; } + + @Test + public void testClearStateRevokeLeadership() throws Exception { + new Context() {{ + MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList()); + + startResourceManager(); + rmServices.rmLeaderElectionService.notLeader(); + rmServices.grantLeadership(); + + //resourceManager.stateCleared.await(5, TimeUnit.SECONDS); Review comment: Can this line be removed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567081#comment-16567081 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207293251 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -894,17 +900,21 @@ public void grantLeadership(final UUID newLeaderSessionID) { // clear the state if we've been the leader before if (getFencingToken() != null) { - clearState(); + clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); - getRpcService().execute( - () -> + prepareLeadershipAsync() + .exceptionally(t -> { + onFatalError(t); + return null; + }) Review comment: Let's move the exception handling at the very end. That way we can also catch if `confirmLeaderSessionID` fails. In all cases, we should call `onFatalError`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567083#comment-16567083 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207295725 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -278,22 +267,76 @@ protected void initialize() throws ResourceManagerException { catch (IOException e) { throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e); } + } - // begin scheduling - connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); - schedulerDriver.start(); + @Override + protected CompletableFuture prepareLeadershipAsync() { + Preconditions.checkState(initializedMesosConfig != null); + + return clearStateFuture + .thenRunAsync(() -> { + schedulerDriver = initializedMesosConfig.createDriver( + new MesosResourceManagerSchedulerCallback(), + false); + + // create supporting actors + connectionMonitor = createConnectionMonitor(); + launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor); + reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver); + taskMonitor = createTaskMonitor(schedulerDriver); + }, getMainThreadExecutor()) + .thenCombineAsync(getWorkersAsync(), (ignored, tasksFromPreviousAttempts) -> { + // recover state + recoverWorkers(tasksFromPreviousAttempts); + + // begin scheduling + connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); + schedulerDriver.start(); + + LOG.info("Mesos resource manager started."); + return null; + }, getMainThreadExecutor()); + } - LOG.info("Mesos resource manager initialized."); + @Override + protected void clearState() { + schedulerDriver.stop(true); + + clearStateFuture = stopSupportingActorsAsync().thenRunAsync(() -> { + workersInNew.clear(); + workersInLaunch.clear(); + workersBeingReturned.clear(); Review comment: Can't we clear these fields right away when `clearState` is called? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16565607#comment-16565607 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r206951198 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -278,22 +268,76 @@ protected void initialize() throws ResourceManagerException { catch (IOException e) { throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e); } + } - // begin scheduling - connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); - schedulerDriver.start(); + @Override + protected CompletableFuture prepareLeadershipAsync() { + Preconditions.checkState(initializedMesosConfig != null); + + return clearStateFuture + .thenRunAsync(() -> { + schedulerDriver = initializedMesosConfig.createDriver( + new MesosResourceManagerSchedulerCallback(), + false); + + // create supporting actors + connectionMonitor = createConnectionMonitor(); + launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor); + reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver); + taskMonitor = createTaskMonitor(schedulerDriver); + }, getMainThreadExecutor()) + .thenCombineAsync(getWorkersAsync(), (ignored, tasksFromPreviousAttempts) -> { + // recover state + recoverWorkers(tasksFromPreviousAttempts); + + // begin scheduling + connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); + schedulerDriver.start(); + + LOG.info("Mesos resource manager started."); + return null; + }, getMainThreadExecutor()); + } - LOG.info("Mesos resource manager initialized."); + @Override + protected void clearState() { + schedulerDriver.stop(true); + + clearStateFuture = stopSupportingActorsAsync().thenRunAsync(() -> { + workersInNew.clear(); + workersInLaunch.clear(); + workersBeingReturned.clear(); + }, getUnfencedMainThreadExecutor()); } /** -* Recover framework/worker information persisted by a prior incarnation of the RM. +* Fetches framework/worker information persisted by a prior incarnation of the RM. */ - private void recoverWorkers() throws Exception { + private CompletableFuture> getWorkersAsync() { // if this resource manager is recovering from failure, // then some worker tasks are most likely still alive and we can re-obtain them - final List tasksFromPreviousAttempts = workerStore.recoverWorkers(); + return CompletableFuture.supplyAsync(() -> { + try { + final List tasksFromPreviousAttempts = workerStore.recoverWorkers(); + for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) { + if (worker.state() == MesosWorkerStore.WorkerState.New) { + // remove new workers because allocation requests are transient + workerStore.removeWorker(worker.taskID()); Review comment: This is copied code but I don't understand why it's needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16565341#comment-16565341 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r206884046 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -279,21 +270,55 @@ protected void initialize() throws ResourceManagerException { throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e); } - // begin scheduling - connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); - schedulerDriver.start(); - LOG.info("Mesos resource manager initialized."); } + @Override + protected CompletableFuture prepareLeadershipAsync() { + return getRpcService().execute(() -> recoverWorkers()) + .thenAcceptAsync((tasksFromPreviousAttempts) -> { + try { + recoverWorkers(tasksFromPreviousAttempts); + } catch (Exception e) { + throw new CompletionException(new ResourceManagerException(e)); + } + + // begin scheduling + connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); + schedulerDriver.start(); + + LOG.info("Mesos resource manager started."); + }, getMainThreadExecutor()); + } + + @Override + protected void clearState() { + schedulerDriver.stop(true); Review comment: From my experiments it seems that it is not possible to stop and re-start the `SchedulerDriver`. I'd suggest re-creating the supporting actors and the `SchedulerDriver`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16565340#comment-16565340 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on issue #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#issuecomment-409579075 @liurenjie1024 I currently have resources to work on this full time. I think I can create a new PR today or tomorrow. I'd be happy if you can review the code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564913#comment-16564913 ] ASF GitHub Bot commented on FLINK-9936: --- liurenjie1024 commented on issue #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#issuecomment-409484118 Hi @GJL: If you don't have enough time for this, I still would like to work on it under your guidance. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564903#comment-16564903 ] ASF GitHub Bot commented on FLINK-9936: --- zentol commented on issue #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#issuecomment-409481500 If you want to share work-in-progress with another committer please just send him the branch directly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564307#comment-16564307 ] ASF GitHub Bot commented on FLINK-9936: --- GJL opened a new pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464 WIP cc: @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16563719#comment-16563719 ] ASF GitHub Bot commented on FLINK-9936: --- liurenjie1024 commented on issue #6451: [FLINK-9936] Resource manager connect to mesos after leadership granted. . URL: https://github.com/apache/flink/pull/6451#issuecomment-409232804 @GJL Thanks for the review. It's ok for you to take over it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16563658#comment-16563658 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on a change in pull request #6451: [FLINK-9936] Resource manager connect to mesos after leadership granted. . URL: https://github.com/apache/flink/pull/6451#discussion_r206522513 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -221,10 +221,14 @@ protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriv // /** -* Starts the Mesos-specifics. +* Do nothing and all work has been moved to on leadership granted callback. */ @Override protected void initialize() throws ResourceManagerException { + } + + @Override + protected void onLeaderShipGranted() throws Exception { Review comment: We are re-creating the supporting actors without shutting them down. This should result in memory leaks everytime the instance wins the leader election. It is also not very nice that we occupy the main thread of the actor with blocking IO. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561819#comment-16561819 ] ASF GitHub Bot commented on FLINK-9936: --- GJL commented on issue #6451: [FLINK-9936] Resource manager connect to mesos after leadership granted. . URL: https://github.com/apache/flink/pull/6451#issuecomment-408838604 I'll take a look today. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561795#comment-16561795 ] ASF GitHub Bot commented on FLINK-9936: --- yanghua commented on a change in pull request #6451: [FLINK-9936] Resource manager connect to mesos after leadership granted. . URL: https://github.com/apache/flink/pull/6451#discussion_r206098411 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -283,7 +287,14 @@ protected void initialize() throws ResourceManagerException { connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); schedulerDriver.start(); - LOG.info("Mesos resource manager initialized."); + LOG.info("Mesos resource manager started."); + } + + @Override + protected void onLeaderShipRevoked() throws Exception { + workerStore.stop(false); Review comment: We should warp try/catch block for these three calls to make sure they can be stopped even if previous statement throws an exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561796#comment-16561796 ] ASF GitHub Bot commented on FLINK-9936: --- yanghua commented on a change in pull request #6451: [FLINK-9936] Resource manager connect to mesos after leadership granted. . URL: https://github.com/apache/flink/pull/6451#discussion_r206097341 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -221,10 +221,14 @@ protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriv // /** -* Starts the Mesos-specifics. +* Do nothing and all work has been moved to on leadership granted callback. Review comment: replace "on leadership granted" to "onLeaderShipGranted" looks better to me This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561610#comment-16561610 ] ASF GitHub Bot commented on FLINK-9936: --- liurenjie1024 opened a new pull request #6451: [FLINK-9936] Resource manager connect to mesos after leadership granted. . URL: https://github.com/apache/flink/pull/6451 ## What is the purpose of the change This PR fixes a bug in mesos resource manager which makes it unable to connect to mesos after failover. ## Brief change log - Add **onLeadershipGranted** callback in resource manager. - Add **onLeadershipRevoked** callback in resource manager. ## Verifying this change This change added tests and can be verified as follows: - Manually verified the change by running a cluster on mesos, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561101#comment-16561101 ] Renjie Liu commented on FLINK-9936: --- [~gjy] I'm working on this and I've already made an internal version testing in our own deployment which works well. I'm going to publish our patch and add some tests to it. > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561093#comment-16561093 ] Gary Yao commented on FLINK-9936: - [~liurenjie1024] What's the state of this ticket? Do you have solution in mind? I think we need to add a callback to the runnable scheduled in {{ResourceManager#grantLeadership}}. What do you think? > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)