[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15215832#comment-15215832 ] ASF GitHub Bot commented on FLINK-3544: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1741 > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210126#comment-15210126 ] ASF GitHub Bot commented on FLINK-3544: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1741#issuecomment-200803051 Continuing my review now, but don't block this on me. If I find anything crucial, I will open a pull request against master if it is merged by then.. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210037#comment-15210037 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57295865 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -240,6 +246,37 @@ */ public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout"; + + // Common Resource Framework Configuration (YARN & Mesos) + + /** +* Percentage of heap space to remove from containers (YARN / Mesos), to compensate +* for other JVM memory usage. +*/ + public static final String CONTAINERED_HEAP_CUTOFF_RATIO = "containered.heap-cutoff-ratio"; + + /** +* Minimum amount of heap memory to remove in containers, as a safety margin. +*/ + public static final String CONTAINERED_HEAP_CUTOFF_MIN = "containered.heap-cutoff-min"; + + /** +* Prefix for passing custom environment variables to Flink's master process. +* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: +* yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" +* in the flink-conf.yaml. +*/ + public static final String CONTAINERED_MASTER_ENV_PREFIX = "containered.application-master.env."; + + /** +* Similar to the {@see CONTAINERED_MASTER_ENV_PREFIX}, this configuration prefix allows +* setting custom environment variables for the workers (TaskManagers) +*/ + public static final String CONTAINERED_TASK_MANAGER_ENV_PREFIX = "containered.taskmanager.env."; --- End diff -- I thought about theses prefixes again. I think they make sense but we could possibly change them before the release in a follow-up. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210040#comment-15210040 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1741#issuecomment-200769986 I've incorporated the changes and the tests pass. I would like to merge the pull request. Please let me know if there are still pending code reviews. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206875#comment-15206875 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1741#issuecomment-199934108 Thanks a lot @tillrohrmann for taking the time to look into the bulk of code. I have already eagerly addressed most of your comments in the additional commits I pushed. Next, I'll revise the cluster shutdown logic and the re-connect in case of unresponsiveness of the resource manager. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206850#comment-15206850 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57031901 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,121 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => --- End diff -- Done (doesn't change the diff). > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206835#comment-15206835 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1741#issuecomment-199920379 Great work @mxm! I really like the new architecture :-) I had some minor comments. The only thing which is important to fix is that the `JobManager` can terminate if it is not connected to a `FlinkResourceManager`. Apart from that, I think we should merge it soon so that it can get some exposure. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206832#comment-15206832 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57030485 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java --- @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.Props; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.yarn.messages.ContainersAllocated; +import org.apache.flink.yarn.messages.ContainersComplete; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import org.slf4j.Logger; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Specialized Flink Resource Manager implementation for YARN clusters. It is started as the + * YARN ApplicationMaster and implements the YARN-specific logic for container requests and failure + * monitoring. + */ +public class YarnFlinkResourceManager extends FlinkResourceManager { + + /** The heartbeat interval while the resource master is waiting for containers */ + private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500; + + /** The default heartbeat interval during regular operation */ + private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000; + + /** The containers where a TaskManager is starting and we are waiting for it to register */ + private final MapcontainersInLaunch; + + /** Containers we have released, where we are waiting for an acknowledgement that +* they are released */ + private final Map containersBeingReturned; + + /** The YARN / Hadoop configuration object */ + private final YarnConfiguration yarnConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final ContaineredTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final ContainerLaunchContext taskManagerLaunchContext; + + /** Host name for the container
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206815#comment-15206815 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57028233 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,121 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => --- End diff -- Could be good to log the failure. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206695#comment-15206695 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57017514 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java --- @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.Props; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.yarn.messages.ContainersAllocated; +import org.apache.flink.yarn.messages.ContainersComplete; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import org.slf4j.Logger; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Specialized Flink Resource Manager implementation for YARN clusters. It is started as the + * YARN ApplicationMaster and implements the YARN-specific logic for container requests and failure + * monitoring. + */ +public class YarnFlinkResourceManager extends FlinkResourceManager { + + /** The heartbeat interval while the resource master is waiting for containers */ + private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500; + + /** The default heartbeat interval during regular operation */ + private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000; + + /** The containers where a TaskManager is starting and we are waiting for it to register */ + private final MapcontainersInLaunch; + + /** Containers we have released, where we are waiting for an acknowledgement that +* they are released */ + private final Map containersBeingReturned; + + /** The YARN / Hadoop configuration object */ + private final YarnConfiguration yarnConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final ContaineredTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final ContainerLaunchContext taskManagerLaunchContext; + + /** Host name for the
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206753#comment-15206753 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57021932 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ResourceManagerITCase { + + private static ActorSystem system; + + private static Configuration config = new Configuration(); + + @Before + public void setup() { + system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + } + + @After + public void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Tests whether the resource manager connects and reconciles existing task managers. +*/ + @Test + public void testResourceManagerReconciliation() { + + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + ActorGateway jobManager = TestingUtils.createJobManager(system, config); + ActorGateway me = + TestingUtils.createForwardingActor(system, getTestActor(), Option.empty()); + + // !! no resource manager started !! + + ResourceID resourceID = ResourceID.generate(); + + jobManager.tell( + new RegistrationMessages.RegisterTaskManager( + resourceID, + Mockito.mock(InstanceConnectionInfo.class), + null, + 1), + me); + + expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class); + + // now start the resource manager + ActorGateway resourceManager = + TestingUtils.createResourceManager(system, jobManager.actor(), config); + + // register at testing job manager to receive a message once a resource manager registers + resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); + + // Wait for resource manager + expectMsgEquals(Messages.getAcknowledge()); + + // check if we registered the task manager resource +
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206703#comment-15206703 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57018050 --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala --- @@ -115,7 +115,7 @@ class ApplicationClient( val jobManager = context.actorSelection(jobManagerAkkaURL) jobManager ! decorateMessage( -RegisterApplicationClient +RegisterInfoMessageListener.get() --- End diff -- `getInstance`? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206704#comment-15206704 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57018075 --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala --- @@ -148,7 +148,7 @@ class ApplicationClient( INITIAL_POLLING_DELAY, WAIT_FOR_YARN_INTERVAL, yarnJobManager.get, - decorateMessage(PollYarnClusterStatus)) + decorateMessage(GetClusterStatus.get())) --- End diff -- `getInstance`? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206658#comment-15206658 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57015042 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedAction; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link YarnFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container + * allocation and failure detection. + */ +public class YarnApplicationMasterRunner { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final MapENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + + //
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206625#comment-15206625 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57011989 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedAction; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link YarnFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container + * allocation and failure detection. + */ +public class YarnApplicationMasterRunner { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final MapENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + + //
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206622#comment-15206622 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57011773 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -39,24 +39,32 @@ import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint._ import org.apache.flink.runtime.client._ import org.apache.flink.runtime.execution.SuppressRestartsException +import org.apache.flink.runtime.clusterframework.FlinkResourceManager +import org.apache.flink.runtime.clusterframework.messages._ +import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager +import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, RestartStrategyFactory} import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex} -import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager} +import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, --- End diff -- `HardwareDescription` and `InstnaceConnectionInfo` unused > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206619#comment-15206619 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57011690 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -23,7 +23,7 @@ import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, import java.util.UUID import java.util.concurrent.{TimeUnit, ExecutorService} -import akka.actor.Status.Failure +import akka.actor.Status.{Success, Failure} --- End diff -- `Success` unused > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206614#comment-15206614 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57011449 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -60,8 +63,8 @@ * See documentation */ public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) { - float memoryCutoffRatio = conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); - int minCutoff = conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.DEFAULT_YARN_MIN_HEAP_CUTOFF); + float memoryCutoffRatio = conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.25f); + int minCutoff = conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 600); --- End diff -- I was actually referring to why not using `ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF` instead of `600`? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206594#comment-15206594 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57009817 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -60,8 +63,8 @@ * See documentation */ public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) { - float memoryCutoffRatio = conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); - int minCutoff = conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.DEFAULT_YARN_MIN_HEAP_CUTOFF); + float memoryCutoffRatio = conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.25f); + int minCutoff = conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 600); --- End diff -- Originally, those were going to be replaced by generalized keys and values. However, we need default values for every resource management system. So I'll use the generalized keys with the default yarn values here. Like so: ```java float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); int minCutoff = conf.getInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF); ``` The deprecated keys are replaced beforehand using ```java BootstrapTools.substituteDeprecatedConfigKey(conf, ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO); BootstrapTools.substituteDeprecatedConfigKey(conf, ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN); ``` > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206573#comment-15206573 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57008796 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedAction; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link YarnFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container + * allocation and failure detection. + */ +public class YarnApplicationMasterRunner { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final MapENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + + //
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206575#comment-15206575 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57008853 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedAction; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link YarnFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container + * allocation and failure detection. + */ +public class YarnApplicationMasterRunner { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final MapENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + + //
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206544#comment-15206544 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57006429 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ResourceManagerITCase { + + private static ActorSystem system; + + private static Configuration config = new Configuration(); + + @Before + public void setup() { + system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + } + + @After + public void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Tests whether the resource manager connects and reconciles existing task managers. +*/ + @Test + public void testResourceManagerReconciliation() { + + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + ActorGateway jobManager = TestingUtils.createJobManager(system, config); + ActorGateway me = + TestingUtils.createForwardingActor(system, getTestActor(), Option.empty()); + + // !! no resource manager started !! + + ResourceID resourceID = ResourceID.generate(); + + jobManager.tell( + new RegistrationMessages.RegisterTaskManager( + resourceID, + Mockito.mock(InstanceConnectionInfo.class), + null, + 1), + me); + + expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class); + + // now start the resource manager + ActorGateway resourceManager = + TestingUtils.createResourceManager(system, jobManager.actor(), config); + + // register at testing job manager to receive a message once a resource manager registers + resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); + + // Wait for resource manager + expectMsgEquals(Messages.getAcknowledge()); + + // check if we registered the task manager resource +
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206540#comment-15206540 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57006267 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ResourceManagerITCase { + + private static ActorSystem system; + + private static Configuration config = new Configuration(); + + @Before + public void setup() { + system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + } + + @After + public void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Tests whether the resource manager connects and reconciles existing task managers. +*/ + @Test + public void testResourceManagerReconciliation() { + + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + ActorGateway jobManager = TestingUtils.createJobManager(system, config); + ActorGateway me = + TestingUtils.createForwardingActor(system, getTestActor(), Option.empty()); + + // !! no resource manager started !! + + ResourceID resourceID = ResourceID.generate(); + + jobManager.tell( + new RegistrationMessages.RegisterTaskManager( + resourceID, + Mockito.mock(InstanceConnectionInfo.class), + null, + 1), + me); + + expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class); + + // now start the resource manager + ActorGateway resourceManager = + TestingUtils.createResourceManager(system, jobManager.actor(), config); + + // register at testing job manager to receive a message once a resource manager registers + resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); + + // Wait for resource manager + expectMsgEquals(Messages.getAcknowledge()); + + // check if we registered the task manager resource +
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206529#comment-15206529 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57005630 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ResourceManagerITCase { + + private static ActorSystem system; + + private static Configuration config = new Configuration(); + + @Before + public void setup() { + system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + } + + @After + public void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Tests whether the resource manager connects and reconciles existing task managers. +*/ + @Test + public void testResourceManagerReconciliation() { + + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { --- End diff -- Then I guess we have a different understanding of readability. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206533#comment-15206533 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57005727 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala --- @@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat, Input import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job} +import scala.annotation.implicitNotFound --- End diff -- Good question, it's hopefully the latter case. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206525#comment-15206525 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57004770 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -60,8 +63,8 @@ * See documentation */ public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) { - float memoryCutoffRatio = conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); - int minCutoff = conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.DEFAULT_YARN_MIN_HEAP_CUTOFF); + float memoryCutoffRatio = conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.25f); + int minCutoff = conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 600); --- End diff -- Why not using the default values defined in the `ConfigConstants`? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206506#comment-15206506 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57002955 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala --- @@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat, Input import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job} +import scala.annotation.implicitNotFound --- End diff -- Ah true. Why is it actually that our Scala Checkstyle does not detect unused imports? Not supported or not configured? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206499#comment-15206499 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57002327 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala --- @@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat, Input import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job} +import scala.annotation.implicitNotFound --- End diff -- Unused import > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206489#comment-15206489 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57001487 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ResourceManagerITCase { + + private static ActorSystem system; + + private static Configuration config = new Configuration(); + + @Before + public void setup() { + system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + } + + @After + public void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Tests whether the resource manager connects and reconciles existing task managers. +*/ + @Test + public void testResourceManagerReconciliation() { + + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + ActorGateway jobManager = TestingUtils.createJobManager(system, config); + ActorGateway me = + TestingUtils.createForwardingActor(system, getTestActor(), Option.empty()); + + // !! no resource manager started !! + + ResourceID resourceID = ResourceID.generate(); + + jobManager.tell( + new RegistrationMessages.RegisterTaskManager( + resourceID, + Mockito.mock(InstanceConnectionInfo.class), + null, + 1), + me); + + expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class); + + // now start the resource manager + ActorGateway resourceManager = + TestingUtils.createResourceManager(system, jobManager.actor(), config); + + // register at testing job manager to receive a message once a resource manager registers + resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); + + // Wait for resource manager + expectMsgEquals(Messages.getAcknowledge()); + + // check if we registered the task manager resource +
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206482#comment-15206482 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57000864 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ResourceManagerITCase { + + private static ActorSystem system; + + private static Configuration config = new Configuration(); + + @Before + public void setup() { + system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + } + + @After + public void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Tests whether the resource manager connects and reconciles existing task managers. +*/ + @Test + public void testResourceManagerReconciliation() { + + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { --- End diff -- This is intentionally. Readability sometimes goes over a strict style for me. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206485#comment-15206485 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57000923 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; + +public class BootstrapToolsTest { + + @Test + public void testSubstituteConfigKey() { + String deprecatedKey1 ="deprecated-key"; + String deprecatedKey2 ="another-out_of-date_key"; + String deprecatedKey3 ="yet-one-more"; + + String designatedKey1 ="newkey1"; + String designatedKey2 ="newKey2"; + String designatedKey3 ="newKey3"; + + String value1 = "value1"; + String value2_designated = "designated-value2"; + String value2_deprecated = "deprecated-value2"; + + // config contains only deprecated key 1, and for key 2 both deprecated and designated + Configuration cfg = new Configuration(); + cfg.setString(deprecatedKey1, value1); + cfg.setString(deprecatedKey2, value2_deprecated); + cfg.setString(designatedKey2, value2_designated); + + BootstrapTools.substituteDeprecatedConfigKey(cfg, deprecatedKey1, designatedKey1); + BootstrapTools.substituteDeprecatedConfigKey(cfg, deprecatedKey2, designatedKey2); + BootstrapTools.substituteDeprecatedConfigKey(cfg, deprecatedKey3, designatedKey3); + + // value 1 should be set to designated + assertEquals(value1, cfg.getString(designatedKey1, null)); + + // value 2 should not have been set, since it had a value already + assertEquals(value2_designated, cfg.getString(designatedKey2, null)); + + // nothing should be in there for key 3 + assertNull(cfg.getString(designatedKey3, null)); + assertNull(cfg.getString(deprecatedKey3, null)); + } + + @Test + public void testSubstituteConfigKeyPrefix() { + String deprecatedPrefix1 ="deprecated-prefix"; + String deprecatedPrefix2 ="-prefix-2"; + String deprecatedPrefix3 ="prefix-3"; + + String designatedPrefix1 ="p1"; + String designatedPrefix2 ="ppp"; + String designatedPrefix3 ="zzz"; + + String depr1 = deprecatedPrefix1 + "var"; + String depr2 = deprecatedPrefix2 + "env"; + String depr3 = deprecatedPrefix2 + "x"; + + String desig1 = deprecatedPrefix1 + "var"; + String desig2 = deprecatedPrefix2 + "env"; + String desig3 = deprecatedPrefix2 + "x"; --- End diff -- Nope. Good catch. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206484#comment-15206484 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57000909 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ResourceManagerITCase { --- End diff -- Thanks. Adding. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206399#comment-15206399 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56989537 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; + +public class BootstrapToolsTest { + + @Test + public void testSubstituteConfigKey() { + String deprecatedKey1 ="deprecated-key"; + String deprecatedKey2 ="another-out_of-date_key"; + String deprecatedKey3 ="yet-one-more"; + + String designatedKey1 ="newkey1"; + String designatedKey2 ="newKey2"; + String designatedKey3 ="newKey3"; + + String value1 = "value1"; + String value2_designated = "designated-value2"; + String value2_deprecated = "deprecated-value2"; + + // config contains only deprecated key 1, and for key 2 both deprecated and designated + Configuration cfg = new Configuration(); + cfg.setString(deprecatedKey1, value1); + cfg.setString(deprecatedKey2, value2_deprecated); + cfg.setString(designatedKey2, value2_designated); + + BootstrapTools.substituteDeprecatedConfigKey(cfg, deprecatedKey1, designatedKey1); + BootstrapTools.substituteDeprecatedConfigKey(cfg, deprecatedKey2, designatedKey2); + BootstrapTools.substituteDeprecatedConfigKey(cfg, deprecatedKey3, designatedKey3); + + // value 1 should be set to designated + assertEquals(value1, cfg.getString(designatedKey1, null)); + + // value 2 should not have been set, since it had a value already + assertEquals(value2_designated, cfg.getString(designatedKey2, null)); + + // nothing should be in there for key 3 + assertNull(cfg.getString(designatedKey3, null)); + assertNull(cfg.getString(deprecatedKey3, null)); + } + + @Test + public void testSubstituteConfigKeyPrefix() { + String deprecatedPrefix1 ="deprecated-prefix"; + String deprecatedPrefix2 ="-prefix-2"; + String deprecatedPrefix3 ="prefix-3"; + + String designatedPrefix1 ="p1"; + String designatedPrefix2 ="ppp"; + String designatedPrefix3 ="zzz"; + + String depr1 = deprecatedPrefix1 + "var"; + String depr2 = deprecatedPrefix2 + "env"; + String depr3 = deprecatedPrefix2 + "x"; + + String desig1 = deprecatedPrefix1 + "var"; + String desig2 = deprecatedPrefix2 + "env"; + String desig3 = deprecatedPrefix2 + "x"; --- End diff -- Is it intended that you used the deprecatedPrefix for the designated keys? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206405#comment-15206405 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56990053 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ResourceManagerITCase { --- End diff -- Extending the `TestLogger` will give use nice outputs which test case is currently being run in the logs. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206407#comment-15206407 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56990136 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ResourceManagerITCase { + + private static ActorSystem system; + + private static Configuration config = new Configuration(); + + @Before + public void setup() { + system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + } + + @After + public void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Tests whether the resource manager connects and reconciles existing task managers. +*/ + @Test + public void testResourceManagerReconciliation() { + + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { --- End diff -- indentation > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206377#comment-15206377 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56987194 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala --- @@ -211,6 +238,13 @@ class LocalFlinkMiniCluster( JobManager.JOB_MANAGER_NAME } } + protected def getResourceManagerName(index: Int): String = { +if(singleActorSystem) { + FlinkResourceManager.RESOURCE_MANAGER_NAME + "_" + (index + 1) +} else { + FlinkResourceManager.RESOURCE_MANAGER_NAME +} + } --- End diff -- Corrected, but not part of this diff. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206352#comment-15206352 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56985677 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,125 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => + // slow or unreachable resource manager, register anyway and let the rm reconnect + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + self ! decorateMessage(new DisconnectResourceManager(rm)) + }(context.dispatcher) + +case None => + log.info("Task Manager Registration but not connected to ResourceManager") + // ResourceManager not yet available + // sending task manager information later upon ResourceManager registration + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + } + +case msg: RegisterResourceSuccessful => + + val originalMsg = msg.getRegistrationMessage + val taskManager = msg.getTaskManager + + // ResourceManager knows about the resource, now let's try to register TaskManager if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId -// IMPORTANT: Send the response to the "sender", which is not the -//TaskManager actor, but the ask future! -sender() ! decorateMessage( +taskManager ! decorateMessage( AlreadyRegistered( instanceID, -libraryCacheManager.getBlobServerPort) -) - } - else { +libraryCacheManager.getBlobServerPort)) + } else { try { val instanceID = instanceManager.registerTaskManager( taskManager, -connectionInfo, -hardwareInformation, -numberOfSlots, +originalMsg.resourceId, +originalMsg.connectionInfo, +originalMsg.resources, +originalMsg.numberOfSlots, leaderSessionID.orNull) - // IMPORTANT: Send the response to the "sender", which is not the - //TaskManager actor, but the ask future! - sender() !
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206344#comment-15206344 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56985225 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -928,11 +934,6 @@ class TaskManager( jm => context.unwatch(jm) } -// de-register from the JobManager (faster detection of disconnect) -currentJobManager foreach { - _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is disassociating")) -} --- End diff -- Also an artifact of the original approach. Reverting. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206335#comment-15206335 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56984100 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -592,19 +605,18 @@ class TaskManager( log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " + s"because the TaskManager is already registered at ${currentJobManager.orNull}") } -} -else { +} else { // not yet connected, so let's associate with that JobManager try { associateWithJobManager(jobManager, id, blobPort) } catch { case t: Throwable => killTaskManagerFatal( -"Unable to start TaskManager components after registering at JobManager", t) +"Unable to start TaskManager components and associate with the JobManager ", t) --- End diff -- Not necessary. Removing whitespace. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206333#comment-15206333 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56983997 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -632,34 +643,32 @@ class TaskManager( case RefuseRegistration(reason) => if (currentJobManager.isEmpty) { - log.error(s"The registration at JobManager $jobManagerAkkaURL was refused, " + + log.error(s"The registration at ResourceManager $jobManagerAkkaURL was refused, " + --- End diff -- This is an artifact of refactoring the original approach that I followed where the TaskManager would register at the ResourceManager. Reverting. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206329#comment-15206329 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56983799 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -592,19 +605,18 @@ class TaskManager( log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " + s"because the TaskManager is already registered at ${currentJobManager.orNull}") } -} -else { +} else { // not yet connected, so let's associate with that JobManager try { associateWithJobManager(jobManager, id, blobPort) } catch { case t: Throwable => killTaskManagerFatal( -"Unable to start TaskManager components after registering at JobManager", t) +"Unable to start TaskManager components and associate with the JobManager ", t) } } - // we are already registered at that specific JobManager - duplicate answer, rare cases + // we are already registered at this ResourceManager - duplicate answer, rare cases case AlreadyRegistered(id, blobPort) => val jobManager = sender() --- End diff -- This is an artifact of refactoring the original approach that I followed where the TaskManager would register at the ResourceManager. Reverting. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206330#comment-15206330 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56983829 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -613,12 +625,11 @@ class TaskManager( log.debug("Ignoring duplicate registration acknowledgement.") } else { log.warn(s"Received 'AlreadyRegistered' message from " + - s"JobManager ${jobManager.path}, even through TaskManager is currently " + + s"ResourceManager ${jobManager.path}, even through TaskManager is currently " + --- End diff -- This is an artifact of refactoring the original approach that I followed where the TaskManager would register at the ResourceManager. Reverting. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206188#comment-15206188 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56969482 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1823,8 +1836,8 @@ object TaskManager { * @param taskManagerUrl The akka URL of the JobManager. * @param system The local actor system that should perform the lookup. * @param timeout The maximum time to wait until the lookup fails. - * @throws java.io.IOException Thrown, if the lookup fails. - * @return The ActorRef to the TaskManager +* @throws java.io.IOException Thrown, if the lookup fails. +* @return The ActorRef to the TaskManager --- End diff -- formatting regression > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206189#comment-15206189 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56969518 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -2066,8 +2079,7 @@ object TaskManager { * @param parameter The parameter value. Will be shown in the exception message. * @param name The name of the config parameter. Will be shown in the exception message. * @param errorMessage The optional custom error message to append to the exception message. - * - * @throws IllegalConfigurationException Thrown if the condition is violated. +* @throws IllegalConfigurationException Thrown if the condition is violated. --- End diff -- Wrong indentation > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206187#comment-15206187 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56969207 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -928,11 +934,6 @@ class TaskManager( jm => context.unwatch(jm) } -// de-register from the JobManager (faster detection of disconnect) -currentJobManager foreach { - _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is disassociating")) -} --- End diff -- Why don't we need this anymore? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206181#comment-15206181 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56968776 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -632,34 +643,32 @@ class TaskManager( case RefuseRegistration(reason) => if (currentJobManager.isEmpty) { - log.error(s"The registration at JobManager $jobManagerAkkaURL was refused, " + + log.error(s"The registration at ResourceManager $jobManagerAkkaURL was refused, " + --- End diff -- RM vs. JM conflict? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206178#comment-15206178 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56968677 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -613,12 +625,11 @@ class TaskManager( log.debug("Ignoring duplicate registration acknowledgement.") } else { log.warn(s"Received 'AlreadyRegistered' message from " + - s"JobManager ${jobManager.path}, even through TaskManager is currently " + + s"ResourceManager ${jobManager.path}, even through TaskManager is currently " + --- End diff -- Does `jobManager` contain the `ActorRef` of the RM or the JM? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206177#comment-15206177 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56968619 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -592,19 +605,18 @@ class TaskManager( log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " + s"because the TaskManager is already registered at ${currentJobManager.orNull}") } -} -else { +} else { // not yet connected, so let's associate with that JobManager try { associateWithJobManager(jobManager, id, blobPort) } catch { case t: Throwable => killTaskManagerFatal( -"Unable to start TaskManager components after registering at JobManager", t) +"Unable to start TaskManager components and associate with the JobManager ", t) } } - // we are already registered at that specific JobManager - duplicate answer, rare cases + // we are already registered at this ResourceManager - duplicate answer, rare cases case AlreadyRegistered(id, blobPort) => val jobManager = sender() --- End diff -- Is this the JM or the RM address which we store here in `jobManager`? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206176#comment-15206176 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56968482 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -592,19 +605,18 @@ class TaskManager( log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " + s"because the TaskManager is already registered at ${currentJobManager.orNull}") } -} -else { +} else { // not yet connected, so let's associate with that JobManager try { associateWithJobManager(jobManager, id, blobPort) } catch { case t: Throwable => killTaskManagerFatal( -"Unable to start TaskManager components after registering at JobManager", t) +"Unable to start TaskManager components and associate with the JobManager ", t) --- End diff -- Why the whitespace at the end of the error message? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206167#comment-15206167 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56967954 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala --- @@ -211,6 +238,13 @@ class LocalFlinkMiniCluster( JobManager.JOB_MANAGER_NAME } } + protected def getResourceManagerName(index: Int): String = { --- End diff -- line break missing before this line > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206168#comment-15206168 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56968005 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala --- @@ -211,6 +238,13 @@ class LocalFlinkMiniCluster( JobManager.JOB_MANAGER_NAME } } + protected def getResourceManagerName(index: Int): String = { +if(singleActorSystem) { + FlinkResourceManager.RESOURCE_MANAGER_NAME + "_" + (index + 1) +} else { + FlinkResourceManager.RESOURCE_MANAGER_NAME +} + } --- End diff -- After this line there is also a line break missing > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206133#comment-15206133 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56964129 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,125 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => + // slow or unreachable resource manager, register anyway and let the rm reconnect + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + self ! decorateMessage(new DisconnectResourceManager(rm)) + }(context.dispatcher) + +case None => + log.info("Task Manager Registration but not connected to ResourceManager") + // ResourceManager not yet available + // sending task manager information later upon ResourceManager registration + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + } + +case msg: RegisterResourceSuccessful => + + val originalMsg = msg.getRegistrationMessage + val taskManager = msg.getTaskManager + + // ResourceManager knows about the resource, now let's try to register TaskManager if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId -// IMPORTANT: Send the response to the "sender", which is not the -//TaskManager actor, but the ask future! -sender() ! decorateMessage( +taskManager ! decorateMessage( AlreadyRegistered( instanceID, -libraryCacheManager.getBlobServerPort) -) - } - else { +libraryCacheManager.getBlobServerPort)) + } else { try { val instanceID = instanceManager.registerTaskManager( taskManager, -connectionInfo, -hardwareInformation, -numberOfSlots, +originalMsg.resourceId, +originalMsg.connectionInfo, +originalMsg.resources, +originalMsg.numberOfSlots, leaderSessionID.orNull) - // IMPORTANT: Send the response to the "sender", which is not the - //TaskManager actor, but the ask future! - sender() !
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206128#comment-15206128 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56963220 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,125 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => + // slow or unreachable resource manager, register anyway and let the rm reconnect + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + self ! decorateMessage(new DisconnectResourceManager(rm)) + }(context.dispatcher) + +case None => + log.info("Task Manager Registration but not connected to ResourceManager") + // ResourceManager not yet available + // sending task manager information later upon ResourceManager registration + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + } + +case msg: RegisterResourceSuccessful => + + val originalMsg = msg.getRegistrationMessage + val taskManager = msg.getTaskManager + + // ResourceManager knows about the resource, now let's try to register TaskManager if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId -// IMPORTANT: Send the response to the "sender", which is not the -//TaskManager actor, but the ask future! -sender() ! decorateMessage( +taskManager ! decorateMessage( AlreadyRegistered( instanceID, -libraryCacheManager.getBlobServerPort) -) - } - else { +libraryCacheManager.getBlobServerPort)) + } else { try { val instanceID = instanceManager.registerTaskManager( taskManager, -connectionInfo, -hardwareInformation, -numberOfSlots, +originalMsg.resourceId, +originalMsg.connectionInfo, +originalMsg.resources, +originalMsg.numberOfSlots, leaderSessionID.orNull) - // IMPORTANT: Send the response to the "sender", which is not the - //TaskManager actor, but the ask future! - sender() !
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206091#comment-15206091 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56960595 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,125 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => + // slow or unreachable resource manager, register anyway and let the rm reconnect + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + self ! decorateMessage(new DisconnectResourceManager(rm)) + }(context.dispatcher) + +case None => + log.info("Task Manager Registration but not connected to ResourceManager") + // ResourceManager not yet available + // sending task manager information later upon ResourceManager registration + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + } + +case msg: RegisterResourceSuccessful => + + val originalMsg = msg.getRegistrationMessage + val taskManager = msg.getTaskManager + + // ResourceManager knows about the resource, now let's try to register TaskManager if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId -// IMPORTANT: Send the response to the "sender", which is not the -//TaskManager actor, but the ask future! -sender() ! decorateMessage( +taskManager ! decorateMessage( AlreadyRegistered( instanceID, -libraryCacheManager.getBlobServerPort) -) - } - else { +libraryCacheManager.getBlobServerPort)) + } else { try { val instanceID = instanceManager.registerTaskManager( taskManager, -connectionInfo, -hardwareInformation, -numberOfSlots, +originalMsg.resourceId, +originalMsg.connectionInfo, +originalMsg.resources, +originalMsg.numberOfSlots, leaderSessionID.orNull) - // IMPORTANT: Send the response to the "sender", which is not the - //TaskManager actor, but the ask future! - sender() !
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206078#comment-15206078 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56959658 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala --- @@ -87,7 +91,7 @@ object RegistrationMessages { extends RegistrationMessage /** - * Denotes the unsuccessful registration of a task manager at the job manager. This is the + * Denotes the unsuccessful registration of a task manager at the JobManager. This is the --- End diff -- I've made it consistent. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204869#comment-15204869 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56875016 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,125 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => + // slow or unreachable resource manager, register anyway and let the rm reconnect + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + self ! decorateMessage(new DisconnectResourceManager(rm)) + }(context.dispatcher) + +case None => + log.info("Task Manager Registration but not connected to ResourceManager") + // ResourceManager not yet available + // sending task manager information later upon ResourceManager registration + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + } + +case msg: RegisterResourceSuccessful => + + val originalMsg = msg.getRegistrationMessage + val taskManager = msg.getTaskManager + + // ResourceManager knows about the resource, now let's try to register TaskManager if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId -// IMPORTANT: Send the response to the "sender", which is not the -//TaskManager actor, but the ask future! -sender() ! decorateMessage( +taskManager ! decorateMessage( AlreadyRegistered( instanceID, -libraryCacheManager.getBlobServerPort) -) - } - else { +libraryCacheManager.getBlobServerPort)) + } else { try { val instanceID = instanceManager.registerTaskManager( taskManager, -connectionInfo, -hardwareInformation, -numberOfSlots, +originalMsg.resourceId, +originalMsg.connectionInfo, +originalMsg.resources, +originalMsg.numberOfSlots, leaderSessionID.orNull) - // IMPORTANT: Send the response to the "sender", which is not the - //TaskManager actor, but the ask future! - sender() !
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204823#comment-15204823 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56872406 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala --- @@ -36,7 +38,7 @@ object RegistrationMessages { /** * Triggers the TaskManager to attempt a registration at the JobManager. * - * @param jobManagerURL Akka URL to the JobManager + * @param jobManagerURL Akka URL to the JobManager to ask for the JobManager --- End diff -- Reverted the comment. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204799#comment-15204799 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56870497 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.standalone; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; + +import java.util.Collection; +import java.util.UUID; + +/** + * A standalone implementation of the resource manager. Used when the system is started in + * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos. + */ +public class StandaloneResourceManager extends FlinkResourceManager { + + + public StandaloneResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) { + super(0, flinkConfig, leaderRetriever); + } + + // + // Framework specific behavior + // + + + @Override + protected void newJobManagerLeaderAvailable(String leaderAddress, UUID leaderSessionID) { + super.newJobManagerLeaderAvailable(leaderAddress, leaderSessionID); + } + + @Override + protected void initialize() throws Exception { + // nothing to initialize + } + + @Override + protected void leaderUpdated() { + // nothing to update + } + + @Override + protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + } + + @Override + protected void fatalError(String message, Throwable error) { + log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error); + LOG.error("Shutting down process"); --- End diff -- Yep, will change that. There is the Actor logger and the RM logger. No need to have both. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204787#comment-15204787 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56869299 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala --- @@ -29,22 +29,22 @@ object Messages { case object Acknowledge /** - * Signals that the receiver (JobManager/TaskManager) shall disconnect the sender. - * - * The TaskManager may send this on shutdown to let the JobManager realize the TaskManager - * loss more quickly. - * - * The JobManager may send this message to its TaskManagers to let them clean up their - * tasks that depend on the JobManager and go into a clean state. - * - * @param reason The reason for disconnecting, to be displayed in log and error messages. - */ - case class Disconnect(reason: String) extends RequiresLeaderSessionID +* Accessor for the case object instance, to simplify Java interoperability. +* +* @return The Acknowledge case object instance. +*/ + def getAcknowledge(): Acknowledge.type = Acknowledge /** - * Accessor for the case object instance, to simplify Java interoperability. - * - * @return The Acknowledge case object instance. - */ - def getAcknowledge(): Acknowledge.type = Acknowledge +* Signals that the receiver (JobManager/TaskManager) shall disconnect the sender. +* +* The TaskManager may send this on shutdown to let the JobManager realize the TaskManager +* loss more quickly. +* +* The JobManager may send this message to its TaskManagers to let them clean up their +* tasks that depend on the JobManager and go into a clean state. +* +* @param reason The reason for disconnecting, to be displayed in log and error messages. +*/ + case class Disconnect(reason: String) extends RequiresLeaderSessionID --- End diff -- You're right, there is no need to change the order here. I'll revert the changes. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204782#comment-15204782 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56869070 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -886,10 +957,42 @@ class JobManager( if (instanceManager.isRegistered(taskManager)) { log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.") -instanceManager.unregisterTaskManager(taskManager, false) + instanceManager.unregisterTaskManager(taskManager, false) context.unwatch(taskManager) } +case msg: StopCluster => + + log.info(s"Stopping JobManager with final application status ${msg.finalStatus()} " + +s"and diagnostics: ${msg.message()}") + + val respondTo = sender() + + // stop all task managers + instanceManager.getAllRegisteredInstances.asScala foreach { +instance => + instance.getActorGateway.tell(msg) + } + + // send resource manager the ok + currentResourceManager match { +case Some(rm) => + + // inform rm + rm ! decorateMessage(msg) + + respondTo ! decorateMessage(StopClusterSuccessful.get()) + + // trigger shutdown + shutdown() + +case None => + // retry + context.system.scheduler.scheduleOnce( +2 seconds, self, decorateMessage(msg) + )(context.dispatcher) --- End diff -- You're right. There should be an upper bound for the number of retries. If there was never a RM, then is behaves the same. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204711#comment-15204711 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56863532 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala --- @@ -36,7 +38,7 @@ object RegistrationMessages { /** * Triggers the TaskManager to attempt a registration at the JobManager. * - * @param jobManagerURL Akka URL to the JobManager + * @param jobManagerURL Akka URL to the JobManager to ask for the JobManager --- End diff -- For me it is the other way around. Maybe change this to >Akka URL to the JobManager to ask for the JobManager actor ? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204706#comment-15204706 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56863204 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,125 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => + // slow or unreachable resource manager, register anyway and let the rm reconnect + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + self ! decorateMessage(new DisconnectResourceManager(rm)) + }(context.dispatcher) + +case None => + log.info("Task Manager Registration but not connected to ResourceManager") + // ResourceManager not yet available + // sending task manager information later upon ResourceManager registration + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + } + +case msg: RegisterResourceSuccessful => + + val originalMsg = msg.getRegistrationMessage + val taskManager = msg.getTaskManager + + // ResourceManager knows about the resource, now let's try to register TaskManager if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId -// IMPORTANT: Send the response to the "sender", which is not the -//TaskManager actor, but the ask future! -sender() ! decorateMessage( +taskManager ! decorateMessage( AlreadyRegistered( instanceID, -libraryCacheManager.getBlobServerPort) -) - } - else { +libraryCacheManager.getBlobServerPort)) + } else { try { val instanceID = instanceManager.registerTaskManager( taskManager, -connectionInfo, -hardwareInformation, -numberOfSlots, +originalMsg.resourceId, +originalMsg.connectionInfo, +originalMsg.resources, +originalMsg.numberOfSlots, leaderSessionID.orNull) - // IMPORTANT: Send the response to the "sender", which is not the - //TaskManager actor, but the ask future! - sender() !
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204672#comment-15204672 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56860338 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatAcknowledgement.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.standalone; + +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +/** + * This message is teh successful response to a heartbeat. + */ +public class HeartbeatAcknowledgement implements RequiresLeaderSessionID, java.io.Serializable { --- End diff -- You're looking at an old commit. I already removed this class but didn't want to force-push. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204671#comment-15204671 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56860326 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneMode.java --- @@ -16,24 +16,21 @@ * limitations under the License. */ -package org.apache.flink.runtime.execution; +package org.apache.flink.runtime.clusterframework.standalone; -public interface ExecutionObserver { +/** + * The startup mode for the standalone setup, + */ +public enum StandaloneMode { --- End diff -- You're looking at an old commit. I already removed this class but didn't want to force-push. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204663#comment-15204663 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56860123 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,125 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) --- End diff -- You're right that the message can be lost. Apart from the leader election service there is currently no mechanism to detect a lost connection from RM to the JM. The above code is part of a special case where the RM doesn't reply to a TM registration and the JM decides to disconnect the RM. I suppose we should keep retrying to send the TriggerRegistrationAtJobManager until we receive a registration of a RM. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204629#comment-15204629 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56859466 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatMessage.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.standalone; + +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +import java.io.Serializable; + +import static java.util.Objects.requireNonNull; + +/** + * Heartbeat message send by a standalone TaskManager to the resource master. + */ +public class HeartbeatMessage implements RequiresLeaderSessionID, Serializable { --- End diff -- You're looking at an old commit. I already removed this class but didn't want to force-push. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204625#comment-15204625 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56859173 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import org.apache.flink.util.AbstractID; + +import java.io.Serializable; + +/** + * Class for Resource Ids assigned at the FlinkResourceManager. + */ +public class ResourceID implements Serializable { --- End diff -- In standalone mode, this is just a random String but in Yarn mode, this is the container id assigned at the framework. Thus, not random. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204586#comment-15204586 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56856463 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -886,10 +957,42 @@ class JobManager( if (instanceManager.isRegistered(taskManager)) { log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.") -instanceManager.unregisterTaskManager(taskManager, false) + instanceManager.unregisterTaskManager(taskManager, false) context.unwatch(taskManager) } +case msg: StopCluster => + + log.info(s"Stopping JobManager with final application status ${msg.finalStatus()} " + +s"and diagnostics: ${msg.message()}") + + val respondTo = sender() + + // stop all task managers + instanceManager.getAllRegisteredInstances.asScala foreach { +instance => + instance.getActorGateway.tell(msg) + } + + // send resource manager the ok + currentResourceManager match { +case Some(rm) => + + // inform rm + rm ! decorateMessage(msg) + + respondTo ! decorateMessage(StopClusterSuccessful.get()) + + // trigger shutdown + shutdown() + +case None => + // retry + context.system.scheduler.scheduleOnce( +2 seconds, self, decorateMessage(msg) + )(context.dispatcher) --- End diff -- What if there was never a RM started? Would that make a difference? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204596#comment-15204596 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56857119 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala --- @@ -87,7 +91,7 @@ object RegistrationMessages { extends RegistrationMessage /** - * Denotes the unsuccessful registration of a task manager at the job manager. This is the + * Denotes the unsuccessful registration of a task manager at the JobManager. This is the --- End diff -- This change and the change in line 74 don't seem to be consistent. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204594#comment-15204594 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56856958 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala --- @@ -36,7 +38,7 @@ object RegistrationMessages { /** * Triggers the TaskManager to attempt a registration at the JobManager. * - * @param jobManagerURL Akka URL to the JobManager + * @param jobManagerURL Akka URL to the JobManager to ask for the JobManager --- End diff -- I don't understand the comment here. Admittedly, before it was not much better. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204591#comment-15204591 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56856717 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala --- @@ -29,22 +29,22 @@ object Messages { case object Acknowledge /** - * Signals that the receiver (JobManager/TaskManager) shall disconnect the sender. - * - * The TaskManager may send this on shutdown to let the JobManager realize the TaskManager - * loss more quickly. - * - * The JobManager may send this message to its TaskManagers to let them clean up their - * tasks that depend on the JobManager and go into a clean state. - * - * @param reason The reason for disconnecting, to be displayed in log and error messages. - */ - case class Disconnect(reason: String) extends RequiresLeaderSessionID +* Accessor for the case object instance, to simplify Java interoperability. +* +* @return The Acknowledge case object instance. +*/ + def getAcknowledge(): Acknowledge.type = Acknowledge /** - * Accessor for the case object instance, to simplify Java interoperability. - * - * @return The Acknowledge case object instance. - */ - def getAcknowledge(): Acknowledge.type = Acknowledge +* Signals that the receiver (JobManager/TaskManager) shall disconnect the sender. +* +* The TaskManager may send this on shutdown to let the JobManager realize the TaskManager +* loss more quickly. +* +* The JobManager may send this message to its TaskManagers to let them clean up their +* tasks that depend on the JobManager and go into a clean state. +* +* @param reason The reason for disconnecting, to be displayed in log and error messages. +*/ + case class Disconnect(reason: String) extends RequiresLeaderSessionID --- End diff -- I don't see the reason for these changes here. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204584#comment-15204584 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56855904 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -886,10 +957,42 @@ class JobManager( if (instanceManager.isRegistered(taskManager)) { log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.") -instanceManager.unregisterTaskManager(taskManager, false) + instanceManager.unregisterTaskManager(taskManager, false) context.unwatch(taskManager) } +case msg: StopCluster => + + log.info(s"Stopping JobManager with final application status ${msg.finalStatus()} " + +s"and diagnostics: ${msg.message()}") + + val respondTo = sender() + + // stop all task managers + instanceManager.getAllRegisteredInstances.asScala foreach { +instance => + instance.getActorGateway.tell(msg) + } + + // send resource manager the ok + currentResourceManager match { +case Some(rm) => + + // inform rm + rm ! decorateMessage(msg) + + respondTo ! decorateMessage(StopClusterSuccessful.get()) + + // trigger shutdown + shutdown() + +case None => + // retry + context.system.scheduler.scheduleOnce( +2 seconds, self, decorateMessage(msg) + )(context.dispatcher) --- End diff -- What if we will never establish a connection to the RM again for some reason? Wouldn't that mean that we will never shutdown the JM? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204579#comment-15204579 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56855460 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,125 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => + // slow or unreachable resource manager, register anyway and let the rm reconnect + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + self ! decorateMessage(new DisconnectResourceManager(rm)) + }(context.dispatcher) + +case None => + log.info("Task Manager Registration but not connected to ResourceManager") + // ResourceManager not yet available + // sending task manager information later upon ResourceManager registration + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + } + +case msg: RegisterResourceSuccessful => + + val originalMsg = msg.getRegistrationMessage + val taskManager = msg.getTaskManager + + // ResourceManager knows about the resource, now let's try to register TaskManager if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId -// IMPORTANT: Send the response to the "sender", which is not the -//TaskManager actor, but the ask future! -sender() ! decorateMessage( +taskManager ! decorateMessage( AlreadyRegistered( instanceID, -libraryCacheManager.getBlobServerPort) -) - } - else { +libraryCacheManager.getBlobServerPort)) + } else { try { val instanceID = instanceManager.registerTaskManager( taskManager, -connectionInfo, -hardwareInformation, -numberOfSlots, +originalMsg.resourceId, +originalMsg.connectionInfo, +originalMsg.resources, +originalMsg.numberOfSlots, leaderSessionID.orNull) - // IMPORTANT: Send the response to the "sender", which is not the - //TaskManager actor, but the ask future! - sender() !
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204578#comment-15204578 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56855273 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,125 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) --- End diff -- What happens if this message never reaches the ResourceManager? Is there a mean that the RM can detect that it lost connection to the JM? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204567#comment-15204567 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56854489 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneMode.java --- @@ -16,24 +16,21 @@ * limitations under the License. */ -package org.apache.flink.runtime.execution; +package org.apache.flink.runtime.clusterframework.standalone; -public interface ExecutionObserver { +/** + * The startup mode for the standalone setup, + */ +public enum StandaloneMode { --- End diff -- Class is not used > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204564#comment-15204564 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56854344 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatAcknowledgement.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.standalone; + +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +/** + * This message is teh successful response to a heartbeat. + */ +public class HeartbeatAcknowledgement implements RequiresLeaderSessionID, java.io.Serializable { --- End diff -- Class is not used > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204566#comment-15204566 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56854368 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatMessage.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.standalone; + +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +import java.io.Serializable; + +import static java.util.Objects.requireNonNull; + +/** + * Heartbeat message send by a standalone TaskManager to the resource master. + */ +public class HeartbeatMessage implements RequiresLeaderSessionID, Serializable { --- End diff -- Class is not used > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204555#comment-15204555 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56853621 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -886,10 +957,42 @@ class JobManager( if (instanceManager.isRegistered(taskManager)) { log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.") -instanceManager.unregisterTaskManager(taskManager, false) + instanceManager.unregisterTaskManager(taskManager, false) context.unwatch(taskManager) } +case msg: StopCluster => + + log.info(s"Stopping JobManager with final application status ${msg.finalStatus()} " + +s"and diagnostics: ${msg.message()}") + + val respondTo = sender() + + // stop all task managers + instanceManager.getAllRegisteredInstances.asScala foreach { +instance => + instance.getActorGateway.tell(msg) + } + + // send resource manager the ok + currentResourceManager match { +case Some(rm) => + + // inform rm + rm ! decorateMessage(msg) + + respondTo ! decorateMessage(StopClusterSuccessful.get()) --- End diff -- Maybe rename `get` to `getInstance`. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204550#comment-15204550 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56853405 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -886,10 +957,42 @@ class JobManager( if (instanceManager.isRegistered(taskManager)) { log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.") -instanceManager.unregisterTaskManager(taskManager, false) + instanceManager.unregisterTaskManager(taskManager, false) --- End diff -- Formatting regression > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204506#comment-15204506 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56847510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java --- @@ -21,7 +21,7 @@ /** * Interface for message decorators */ -public interface MessageDecorator { +public interface MessageDecorator extends java.io.Serializable { --- End diff -- Good catch :-) > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204491#comment-15204491 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56845833 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import org.apache.flink.util.AbstractID; + +import java.io.Serializable; + +/** + * Class for Resource Ids assigned at the FlinkResourceManager. + */ +public class ResourceID implements Serializable { --- End diff -- Why don't we use `AbstractID` here? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204476#comment-15204476 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56844658 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.standalone; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; + +import java.util.Collection; +import java.util.UUID; + +/** + * A standalone implementation of the resource manager. Used when the system is started in + * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos. + */ +public class StandaloneResourceManager extends FlinkResourceManager { + + + public StandaloneResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) { + super(0, flinkConfig, leaderRetriever); + } + + // + // Framework specific behavior + // + + + @Override + protected void newJobManagerLeaderAvailable(String leaderAddress, UUID leaderSessionID) { + super.newJobManagerLeaderAvailable(leaderAddress, leaderSessionID); + } + + @Override + protected void initialize() throws Exception { + // nothing to initialize + } + + @Override + protected void leaderUpdated() { + // nothing to update + } + + @Override + protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + } + + @Override + protected void fatalError(String message, Throwable error) { + log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error); + LOG.error("Shutting down process"); --- End diff -- Why do we use two different logger variables here? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199663#comment-15199663 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56518228 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java --- @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; + +import com.google.common.base.Preconditions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.InfoMessage; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResource; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful; +import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RemoveResource; +import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; +import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; +import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; + +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * + * Worker allocation steps + * + * + * The resource manager decides to request more workers. This can happen in order + * to fill the initial pool, or as a result of the JobManager requesting more workers. + * + * The resource master calls {@link #requestNewWorkers(int)}, which triggers requests + * for more containers. After that, the {@link #getNumWorkerRequestsPending()} + * should reflect the pending
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199284#comment-15199284 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56484044 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java --- @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; + +import com.google.common.base.Preconditions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.InfoMessage; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResource; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful; +import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RemoveResource; +import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; +import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; +import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; + +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * + * Worker allocation steps + * + * + * The resource manager decides to request more workers. This can happen in order + * to fill the initial pool, or as a result of the JobManager requesting more workers. + * + * The resource master calls {@link #requestNewWorkers(int)}, which triggers requests + * for more containers. After that, the {@link #getNumWorkerRequestsPending()} + * should reflect the pending
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199243#comment-15199243 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56479617 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/SetWorkerPoolSize.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.messages; + +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +/** + * Message sent to the resource master actor to adjust the designated number of + * workers it maintains. + */ +public class SetWorkerPoolSize implements RequiresLeaderSessionID, java.io.Serializable{ + + private static final long serialVersionUID = -335911350781207609L; + + private final int numberOfWorkers; + + public SetWorkerPoolSize(int numberOfWorkers) { + if (numberOfWorkers < 0) { + throw new IllegalArgumentException(); --- End diff -- Added > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199289#comment-15199289 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56484759 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java --- @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; + +import com.google.common.base.Preconditions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.InfoMessage; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResource; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful; +import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RemoveResource; +import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; +import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; +import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; + +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * + * Worker allocation steps + * + * + * The resource manager decides to request more workers. This can happen in order + * to fill the initial pool, or as a result of the JobManager requesting more workers. + * + * The resource master calls {@link #requestNewWorkers(int)}, which triggers requests + * for more containers. After that, the {@link #getNumWorkerRequestsPending()} + * should reflect the pending requests.
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197686#comment-15197686 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56373341 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/SetWorkerPoolSize.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.messages; + +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +/** + * Message sent to the resource master actor to adjust the designated number of + * workers it maintains. + */ +public class SetWorkerPoolSize implements RequiresLeaderSessionID, java.io.Serializable{ + + private static final long serialVersionUID = -335911350781207609L; + + private final int numberOfWorkers; + + public SetWorkerPoolSize(int numberOfWorkers) { + if (numberOfWorkers < 0) { + throw new IllegalArgumentException(); --- End diff -- Exception message could be helpful > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197708#comment-15197708 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56375125 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatus.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.messages; + +/** + * This message signals the resource master to check how many TaskManagers are + * desired, how many are available, and to trigger adjustments if needed. + */ +public class GetClusterStatus implements java.io.Serializable { --- End diff -- Class is not used according to IntelliJ. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199239#comment-15199239 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56479487 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java --- @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; + +import com.google.common.base.Preconditions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.InfoMessage; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResource; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful; +import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RemoveResource; +import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; +import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; +import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; + +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * + * Worker allocation steps + * + * + * The resource manager decides to request more workers. This can happen in order + * to fill the initial pool, or as a result of the JobManager requesting more workers. + * + * The resource master calls {@link #requestNewWorkers(int)}, which triggers requests + * for more containers. After that, the {@link #getNumWorkerRequestsPending()} + * should reflect the pending requests.
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199283#comment-15199283 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56483912 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class describes the basic parameters for launching a TaskManager process. + */ +public class ContaineredTaskManagerParameters implements java.io.Serializable { + + private static final long serialVersionUID = -3096987654278064670L; + + /** Total container memory, in bytes */ + private final long totalContainerMemoryMB; + + /** Heap size to be used for the Java process */ + private final long taskManagerHeapSizeMB; + + /** Direct memory limit for the Java process */ + private final long taskManagerDirectMemoryLimitMB; + + /** The number of slots per TaskManager */ + private final int numSlots; + + /** Environment variables to add to the Java process */ + private final HashMaptaskManagerEnv; + + + public ContaineredTaskManagerParameters( + long totalContainerMemoryMB, long taskManagerHeapSizeMB, --- End diff -- I think I would place every parameter in one line or on every line multiple parameters. I don't think that multiple parameters on the first line followed by a single parameter on each following line is the most intuitive pattern. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199245#comment-15199245 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56479659 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ShutdownClusterAfterJob.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.messages; + +import org.apache.flink.api.common.JobID; + +/** + * Message sent to the cluster framework master to signal it that the cluster + * should be shut down upon completion of a certain job. + */ +public class ShutdownClusterAfterJob implements java.io.Serializable { --- End diff -- This should replace the Yarn shutdown logic. I've integrated it now to replace `StopAMAfterJob`. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199246#comment-15199246 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56479664 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatus.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.messages; + +/** + * This message signals the resource master to check how many TaskManagers are + * desired, how many are available, and to trigger adjustments if needed. + */ +public class GetClusterStatus implements java.io.Serializable { --- End diff -- Same here. General mechanism. Implemented now to replace `PollYarnClusterStatus`. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199240#comment-15199240 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56479509 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java --- @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; + +import com.google.common.base.Preconditions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.InfoMessage; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResource; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful; +import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RemoveResource; +import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; +import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; +import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; + +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * + * Worker allocation steps + * + * + * The resource manager decides to request more workers. This can happen in order + * to fill the initial pool, or as a result of the JobManager requesting more workers. + * + * The resource master calls {@link #requestNewWorkers(int)}, which triggers requests + * for more containers. After that, the {@link #getNumWorkerRequestsPending()} + * should reflect the pending requests.
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197682#comment-15197682 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56372798 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListenerSuccessful.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.messages; + +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +/** + * This message signals to the application client that the registration was successful. + */ +public class RegisterInfoMessageListenerSuccessful implements RequiresLeaderSessionID, java.io.Serializable { + + private static final long serialVersionUID = 7808628311617273755L; + + /** The singleton instance */ + private static final RegisterInfoMessageListenerSuccessful INSTANCE = new RegisterInfoMessageListenerSuccessful(); + + /** +* Gets the singleton instance. +* @return The singleton instance. +*/ + public static RegisterInfoMessageListenerSuccessful get() { + return INSTANCE; + } + + // + + /** Private constructor to prevent instantiation */ + private RegisterInfoMessageListenerSuccessful() {} + + // + + @Override + public boolean equals(Object obj) { + return obj != null && obj.getClass() == RegisterInfoMessageListenerSuccessful.class; + } + + @Override + public int hashCode() { + return 2018741654; --- End diff -- This class and the `RegisterInfoMessageListener` message have the same hash value. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197622#comment-15197622 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56366895 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java --- @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; + +import com.google.common.base.Preconditions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.InfoMessage; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResource; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful; +import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RemoveResource; +import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; +import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; +import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; + +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * + * Worker allocation steps + * + * + * The resource manager decides to request more workers. This can happen in order + * to fill the initial pool, or as a result of the JobManager requesting more workers. + * + * The resource master calls {@link #requestNewWorkers(int)}, which triggers requests + * for more containers. After that, the {@link #getNumWorkerRequestsPending()} + * should reflect the pending
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199247#comment-15199247 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56479675 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.messages; + +import java.io.Serializable; + +public class GetClusterStatusResponse implements Serializable { --- End diff -- Thanks. Also integrated and replaces `FlinkYarnClusterStatus`. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197651#comment-15197651 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56368934 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java --- @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; + +import com.google.common.base.Preconditions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.InfoMessage; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResource; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful; +import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RemoveResource; +import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; +import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; +import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; + +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * + * Worker allocation steps + * + * + * The resource manager decides to request more workers. This can happen in order + * to fill the initial pool, or as a result of the JobManager requesting more workers. + * + * The resource master calls {@link #requestNewWorkers(int)}, which triggers requests + * for more containers. After that, the {@link #getNumWorkerRequestsPending()} + * should reflect the pending
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197733#comment-15197733 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56377258 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatAcknowledgement.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.standalone; + +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +/** + * This message is teh successful response to a heartbeat. --- End diff -- typo: the > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197684#comment-15197684 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56373029 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManagerSuccessful.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.messages; + +import akka.actor.ActorRef; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +import java.io.Serializable; +import java.util.Collection; + +import static java.util.Objects.requireNonNull; + +/** + * Message that informs the resource manager that the JobManager accepted its registration. + * Carries information about the JobManager, and about the TaskManagers that the JobManager + * still has registered. + */ +public class RegisterResourceManagerSuccessful implements RequiresLeaderSessionID, Serializable { + + private static final long serialVersionUID = 817011779310941753L; + + /** The JobManager which we registered with. */ + private final ActorRef jobManager; + + /** The list of registered TaskManagers that the JobManager currently knows */ + private final Collection currentlyRegisteredTaskManagers; + + --- End diff -- Two line breaks > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199278#comment-15199278 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56483710 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import com.typesafe.config.Config; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.webmonitor.WebMonitor; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.NetUtils; + +import org.slf4j.Logger; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.BindException; +import java.net.ServerSocket; +import java.util.Iterator; + +/** + * Tools for starting JobManager and TaskManager processes, including the + * Actor Systems used to run the JobManager and TaskManager actors. + */ +public class BootstrapTools { + + /** +* Starts an ActorSystem with the given configuration listening at the address/ports. +* @param configuration The Flink configuration +* @param listeningAddress The address to listen at. +* @param portRangeDefinition The port range to choose a port from. +* @param logger The logger to output log information. +* @return The ActorSystem which has been started +* @throws Exception +*/ + public static ActorSystem startActorSystem(Configuration configuration, + String listeningAddress, + String portRangeDefinition, + Logger logger) throws Exception { + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + } + + while (portsIterator.hasNext()) { + // first, we check if the port is available by opening a socket + // if the actor system fails to start on the port, we try further + ServerSocket availableSocket = NetUtils.createSocketFromPorts( + portsIterator, + new NetUtils.SocketFactory() { + @Override + public ServerSocket createSocket(int port) throws IOException { + return new ServerSocket(port); + } + }); + + int port; + if (availableSocket == null) { + throw new BindException("Unable to allocate further port in port range: " + portRangeDefinition); + } else { + port =