[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15215832#comment-15215832
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1741


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210126#comment-15210126
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1741#issuecomment-200803051
  
Continuing my review now, but don't block this on me. If I find anything 
crucial, I will open a pull request against master if it is merged by then..


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210037#comment-15210037
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57295865
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -240,6 +246,37 @@
 */
public static final String FS_STREAM_OPENING_TIMEOUT_KEY = 
"taskmanager.runtime.fs_timeout";
 
+   
+   //  Common Resource Framework Configuration (YARN & Mesos) 

+
+   /**
+* Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
+* for other JVM memory usage.
+*/
+   public static final String CONTAINERED_HEAP_CUTOFF_RATIO = 
"containered.heap-cutoff-ratio";
+
+   /**
+* Minimum amount of heap memory to remove in containers, as a safety 
margin.
+*/
+   public static final String CONTAINERED_HEAP_CUTOFF_MIN = 
"containered.heap-cutoff-min";
+
+   /**
+* Prefix for passing custom environment variables to Flink's master 
process.
+* For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
+* yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+* in the flink-conf.yaml.
+*/
+   public static final String CONTAINERED_MASTER_ENV_PREFIX = 
"containered.application-master.env.";
+
+   /**
+* Similar to the {@see CONTAINERED_MASTER_ENV_PREFIX}, this 
configuration prefix allows
+* setting custom environment variables for the workers (TaskManagers)
+*/
+   public static final String CONTAINERED_TASK_MANAGER_ENV_PREFIX = 
"containered.taskmanager.env.";
--- End diff --

I thought about theses prefixes again. I think they make sense but we could 
possibly change them before the release in a follow-up.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210040#comment-15210040
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1741#issuecomment-200769986
  
I've incorporated the changes and the tests pass. I would like to merge the 
pull request. Please let me know if there are still pending code reviews.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206875#comment-15206875
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1741#issuecomment-199934108
  
Thanks a lot @tillrohrmann for taking the time to look into the bulk of 
code. I have already eagerly addressed most of your comments in the additional 
commits I pushed. Next, I'll revise the cluster shutdown logic and the 
re-connect in case of unresponsiveness of the resource manager.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206850#comment-15206850
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57031901
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,121 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
--- End diff --

Done (doesn't change the diff).


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206835#comment-15206835
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1741#issuecomment-199920379
  
Great work @mxm! I really like the new architecture :-)

I had some minor comments. The only thing which is important to fix is that 
the `JobManager` can terminate if it is not connected to a 
`FlinkResourceManager`. Apart from that, I think we should merge it soon so 
that it can get some exposure. 


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206832#comment-15206832
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57030485
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.yarn.messages.ContainersAllocated;
+import org.apache.flink.yarn.messages.ContainersComplete;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Specialized Flink Resource Manager implementation for YARN clusters. It 
is started as the
+ * YARN ApplicationMaster and implements the YARN-specific logic for 
container requests and failure
+ * monitoring.
+ */
+public class YarnFlinkResourceManager extends 
FlinkResourceManager {
+   
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The containers where a TaskManager is starting and we are waiting 
for it to register */
+   private final Map containersInLaunch;
+
+   /** Containers we have released, where we are waiting for an 
acknowledgement that
+* they are released */
+   private final Map containersBeingReturned;
+
+   /** The YARN / Hadoop configuration object */
+   private final YarnConfiguration yarnConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final ContainerLaunchContext taskManagerLaunchContext;
+
+   /** Host name for the container 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206815#comment-15206815
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57028233
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,121 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
--- End diff --

Could be good to log the failure.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206695#comment-15206695
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57017514
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.yarn.messages.ContainersAllocated;
+import org.apache.flink.yarn.messages.ContainersComplete;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Specialized Flink Resource Manager implementation for YARN clusters. It 
is started as the
+ * YARN ApplicationMaster and implements the YARN-specific logic for 
container requests and failure
+ * monitoring.
+ */
+public class YarnFlinkResourceManager extends 
FlinkResourceManager {
+   
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The containers where a TaskManager is starting and we are waiting 
for it to register */
+   private final Map containersInLaunch;
+
+   /** Containers we have released, where we are waiting for an 
acknowledgement that
+* they are released */
+   private final Map containersBeingReturned;
+
+   /** The YARN / Hadoop configuration object */
+   private final YarnConfiguration yarnConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final ContainerLaunchContext taskManagerLaunchContext;
+
+   /** Host name for the 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206753#comment-15206753
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57021932
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ResourceManagerITCase {
+
+   private static ActorSystem system;
+
+   private static Configuration config = new Configuration();
+
+   @Before
+   public void setup() {
+   system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+   }
+
+   @After
+   public void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Tests whether the resource manager connects and reconciles existing 
task managers.
+*/
+   @Test
+   public void testResourceManagerReconciliation() {
+
+   new JavaTestKit(system){{
+   new Within(duration("10 seconds")) {
+   @Override
+   protected void run() {
+
+   ActorGateway jobManager = 
TestingUtils.createJobManager(system, config);
+   ActorGateway me =
+   TestingUtils.createForwardingActor(system, 
getTestActor(), Option.empty());
+
+   // !! no resource manager started !!
+
+   ResourceID resourceID = ResourceID.generate();
+
+   jobManager.tell(
+   new RegistrationMessages.RegisterTaskManager(
+   resourceID,
+   
Mockito.mock(InstanceConnectionInfo.class),
+   null,
+   1),
+   me);
+
+   
expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
+
+   // now start the resource manager
+   ActorGateway resourceManager =
+   TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
+
+   // register at testing job manager to receive a message 
once a resource manager registers
+   resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+   // Wait for resource manager
+   expectMsgEquals(Messages.getAcknowledge());
+
+   // check if we registered the task manager resource
+   

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206703#comment-15206703
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57018050
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---
@@ -115,7 +115,7 @@ class ApplicationClient(
   val jobManager = context.actorSelection(jobManagerAkkaURL)
 
   jobManager ! decorateMessage(
-RegisterApplicationClient
+RegisterInfoMessageListener.get()
--- End diff --

`getInstance`?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206704#comment-15206704
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57018075
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---
@@ -148,7 +148,7 @@ class ApplicationClient(
   INITIAL_POLLING_DELAY,
   WAIT_FOR_YARN_INTERVAL,
   yarnJobManager.get,
-  decorateMessage(PollYarnClusterStatus))
+  decorateMessage(GetClusterStatus.get()))
--- End diff --

`getInstance`?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206658#comment-15206658
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57015042
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link YarnFlinkResourceManager}.
+ * 
+ * The JobManager handles Flink job execution, while the 
YarnFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnApplicationMasterRunner {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+   
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+   
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+
+   // 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206625#comment-15206625
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57011989
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link YarnFlinkResourceManager}.
+ * 
+ * The JobManager handles Flink job execution, while the 
YarnFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnApplicationMasterRunner {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+   
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+   
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+
+   // 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206622#comment-15206622
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57011773
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -39,24 +39,32 @@ import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.execution.SuppressRestartsException
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager
+import org.apache.flink.runtime.clusterframework.messages._
+import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
+import org.apache.flink.runtime.clusterframework.types.ResourceID
 import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, 
RestartStrategyFactory}
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionJobVertex}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, 
InstanceManager}
+import org.apache.flink.runtime.instance.{HardwareDescription, 
InstanceConnectionInfo,
--- End diff --

`HardwareDescription` and `InstnaceConnectionInfo` unused


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206619#comment-15206619
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57011690
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -23,7 +23,7 @@ import java.net.{BindException, ServerSocket, 
UnknownHostException, InetAddress,
 import java.util.UUID
 import java.util.concurrent.{TimeUnit, ExecutorService}
 
-import akka.actor.Status.Failure
+import akka.actor.Status.{Success, Failure}
--- End diff --

`Success` unused


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206614#comment-15206614
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57011449
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -60,8 +63,8 @@
 * See documentation
 */
public static int calculateHeapSize(int memory, 
org.apache.flink.configuration.Configuration conf) {
-   float memoryCutoffRatio = 
conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
-   int minCutoff = 
conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 
ConfigConstants.DEFAULT_YARN_MIN_HEAP_CUTOFF);
+   float memoryCutoffRatio = 
conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.25f);
+   int minCutoff = 
conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 600);
--- End diff --

I was actually referring to why not using 
`ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF` instead of `600`?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206594#comment-15206594
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57009817
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -60,8 +63,8 @@
 * See documentation
 */
public static int calculateHeapSize(int memory, 
org.apache.flink.configuration.Configuration conf) {
-   float memoryCutoffRatio = 
conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
-   int minCutoff = 
conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 
ConfigConstants.DEFAULT_YARN_MIN_HEAP_CUTOFF);
+   float memoryCutoffRatio = 
conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.25f);
+   int minCutoff = 
conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 600);
--- End diff --

Originally, those were going to be replaced by generalized keys and values. 
However, we need default values for every resource management system. So I'll 
use the generalized keys with the default yarn values here. Like so:

```java
float memoryCutoffRatio = 
conf.getFloat(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO,
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
int minCutoff = 
conf.getInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN,
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
```

The deprecated keys are replaced beforehand using

```java
BootstrapTools.substituteDeprecatedConfigKey(conf,
ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 
ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO);
BootstrapTools.substituteDeprecatedConfigKey(conf,
ConfigConstants.YARN_HEAP_CUTOFF_MIN, 
ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN);
```


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206573#comment-15206573
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57008796
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link YarnFlinkResourceManager}.
+ * 
+ * The JobManager handles Flink job execution, while the 
YarnFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnApplicationMasterRunner {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+   
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+   
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+
+   // 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206575#comment-15206575
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57008853
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link YarnFlinkResourceManager}.
+ * 
+ * The JobManager handles Flink job execution, while the 
YarnFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnApplicationMasterRunner {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+   
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+   
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+
+   // 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206544#comment-15206544
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57006429
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ResourceManagerITCase {
+
+   private static ActorSystem system;
+
+   private static Configuration config = new Configuration();
+
+   @Before
+   public void setup() {
+   system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+   }
+
+   @After
+   public void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Tests whether the resource manager connects and reconciles existing 
task managers.
+*/
+   @Test
+   public void testResourceManagerReconciliation() {
+
+   new JavaTestKit(system){{
+   new Within(duration("10 seconds")) {
+   @Override
+   protected void run() {
+
+   ActorGateway jobManager = 
TestingUtils.createJobManager(system, config);
+   ActorGateway me =
+   TestingUtils.createForwardingActor(system, 
getTestActor(), Option.empty());
+
+   // !! no resource manager started !!
+
+   ResourceID resourceID = ResourceID.generate();
+
+   jobManager.tell(
+   new RegistrationMessages.RegisterTaskManager(
+   resourceID,
+   
Mockito.mock(InstanceConnectionInfo.class),
+   null,
+   1),
+   me);
+
+   
expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
+
+   // now start the resource manager
+   ActorGateway resourceManager =
+   TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
+
+   // register at testing job manager to receive a message 
once a resource manager registers
+   resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+   // Wait for resource manager
+   expectMsgEquals(Messages.getAcknowledge());
+
+   // check if we registered the task manager resource
+   

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206540#comment-15206540
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57006267
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ResourceManagerITCase {
+
+   private static ActorSystem system;
+
+   private static Configuration config = new Configuration();
+
+   @Before
+   public void setup() {
+   system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+   }
+
+   @After
+   public void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Tests whether the resource manager connects and reconciles existing 
task managers.
+*/
+   @Test
+   public void testResourceManagerReconciliation() {
+
+   new JavaTestKit(system){{
+   new Within(duration("10 seconds")) {
+   @Override
+   protected void run() {
+
+   ActorGateway jobManager = 
TestingUtils.createJobManager(system, config);
+   ActorGateway me =
+   TestingUtils.createForwardingActor(system, 
getTestActor(), Option.empty());
+
+   // !! no resource manager started !!
+
+   ResourceID resourceID = ResourceID.generate();
+
+   jobManager.tell(
+   new RegistrationMessages.RegisterTaskManager(
+   resourceID,
+   
Mockito.mock(InstanceConnectionInfo.class),
+   null,
+   1),
+   me);
+
+   
expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
+
+   // now start the resource manager
+   ActorGateway resourceManager =
+   TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
+
+   // register at testing job manager to receive a message 
once a resource manager registers
+   resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+   // Wait for resource manager
+   expectMsgEquals(Messages.getAcknowledge());
+
+   // check if we registered the task manager resource
+   

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206529#comment-15206529
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57005630
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ResourceManagerITCase {
+
+   private static ActorSystem system;
+
+   private static Configuration config = new Configuration();
+
+   @Before
+   public void setup() {
+   system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+   }
+
+   @After
+   public void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Tests whether the resource manager connects and reconciles existing 
task managers.
+*/
+   @Test
+   public void testResourceManagerReconciliation() {
+
+   new JavaTestKit(system){{
+   new Within(duration("10 seconds")) {
--- End diff --

Then I guess we have a different understanding of readability.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206533#comment-15206533
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57005727
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 ---
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.{FileInputFormat => 
MapredFileInputFormat, Input
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
MapreduceFileInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, 
Job}
 
+import scala.annotation.implicitNotFound
--- End diff --

Good question, it's hopefully the latter case.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206525#comment-15206525
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57004770
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -60,8 +63,8 @@
 * See documentation
 */
public static int calculateHeapSize(int memory, 
org.apache.flink.configuration.Configuration conf) {
-   float memoryCutoffRatio = 
conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
-   int minCutoff = 
conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 
ConfigConstants.DEFAULT_YARN_MIN_HEAP_CUTOFF);
+   float memoryCutoffRatio = 
conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.25f);
+   int minCutoff = 
conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 600);
--- End diff --

Why not using the default values defined in the `ConfigConstants`?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206506#comment-15206506
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57002955
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 ---
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.{FileInputFormat => 
MapredFileInputFormat, Input
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
MapreduceFileInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, 
Job}
 
+import scala.annotation.implicitNotFound
--- End diff --

Ah true. Why is it actually that our Scala Checkstyle does not detect 
unused imports? Not supported or not configured? 


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206499#comment-15206499
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57002327
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 ---
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.{FileInputFormat => 
MapredFileInputFormat, Input
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
MapreduceFileInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, 
Job}
 
+import scala.annotation.implicitNotFound
--- End diff --

Unused import


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206489#comment-15206489
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57001487
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ResourceManagerITCase {
+
+   private static ActorSystem system;
+
+   private static Configuration config = new Configuration();
+
+   @Before
+   public void setup() {
+   system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+   }
+
+   @After
+   public void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Tests whether the resource manager connects and reconciles existing 
task managers.
+*/
+   @Test
+   public void testResourceManagerReconciliation() {
+
+   new JavaTestKit(system){{
+   new Within(duration("10 seconds")) {
+   @Override
+   protected void run() {
+
+   ActorGateway jobManager = 
TestingUtils.createJobManager(system, config);
+   ActorGateway me =
+   TestingUtils.createForwardingActor(system, 
getTestActor(), Option.empty());
+
+   // !! no resource manager started !!
+
+   ResourceID resourceID = ResourceID.generate();
+
+   jobManager.tell(
+   new RegistrationMessages.RegisterTaskManager(
+   resourceID,
+   
Mockito.mock(InstanceConnectionInfo.class),
+   null,
+   1),
+   me);
+
+   
expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
+
+   // now start the resource manager
+   ActorGateway resourceManager =
+   TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
+
+   // register at testing job manager to receive a message 
once a resource manager registers
+   resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+   // Wait for resource manager
+   expectMsgEquals(Messages.getAcknowledge());
+
+   // check if we registered the task manager resource
+   

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206482#comment-15206482
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57000864
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ResourceManagerITCase {
+
+   private static ActorSystem system;
+
+   private static Configuration config = new Configuration();
+
+   @Before
+   public void setup() {
+   system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+   }
+
+   @After
+   public void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Tests whether the resource manager connects and reconciles existing 
task managers.
+*/
+   @Test
+   public void testResourceManagerReconciliation() {
+
+   new JavaTestKit(system){{
+   new Within(duration("10 seconds")) {
--- End diff --

This is intentionally. Readability sometimes goes over a strict style for 
me.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206485#comment-15206485
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57000923
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+public class BootstrapToolsTest {
+
+   @Test
+   public void testSubstituteConfigKey() {
+   String deprecatedKey1 ="deprecated-key";
+   String deprecatedKey2 ="another-out_of-date_key";
+   String deprecatedKey3 ="yet-one-more";
+
+   String designatedKey1 ="newkey1";
+   String designatedKey2 ="newKey2";
+   String designatedKey3 ="newKey3";
+
+   String value1 = "value1";
+   String value2_designated = "designated-value2";
+   String value2_deprecated = "deprecated-value2";
+
+   // config contains only deprecated key 1, and for key 2 both 
deprecated and designated
+   Configuration cfg = new Configuration();
+   cfg.setString(deprecatedKey1, value1);
+   cfg.setString(deprecatedKey2, value2_deprecated);
+   cfg.setString(designatedKey2, value2_designated);
+
+   BootstrapTools.substituteDeprecatedConfigKey(cfg, 
deprecatedKey1, designatedKey1);
+   BootstrapTools.substituteDeprecatedConfigKey(cfg, 
deprecatedKey2, designatedKey2);
+   BootstrapTools.substituteDeprecatedConfigKey(cfg, 
deprecatedKey3, designatedKey3);
+
+   // value 1 should be set to designated
+   assertEquals(value1, cfg.getString(designatedKey1, null));
+
+   // value 2 should not have been set, since it had a value 
already
+   assertEquals(value2_designated, cfg.getString(designatedKey2, 
null));
+
+   // nothing should be in there for key 3
+   assertNull(cfg.getString(designatedKey3, null));
+   assertNull(cfg.getString(deprecatedKey3, null));
+   }
+
+   @Test
+   public void testSubstituteConfigKeyPrefix() {
+   String deprecatedPrefix1 ="deprecated-prefix";
+   String deprecatedPrefix2 ="-prefix-2";
+   String deprecatedPrefix3 ="prefix-3";
+
+   String designatedPrefix1 ="p1";
+   String designatedPrefix2 ="ppp";
+   String designatedPrefix3 ="zzz";
+
+   String depr1 = deprecatedPrefix1 + "var";
+   String depr2 = deprecatedPrefix2 + "env";
+   String depr3 = deprecatedPrefix2 + "x";
+
+   String desig1 = deprecatedPrefix1 + "var";
+   String desig2 = deprecatedPrefix2 + "env";
+   String desig3 = deprecatedPrefix2 + "x";
--- End diff --

Nope. Good catch.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206484#comment-15206484
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57000909
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ResourceManagerITCase {
--- End diff --

Thanks. Adding.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206399#comment-15206399
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56989537
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+public class BootstrapToolsTest {
+
+   @Test
+   public void testSubstituteConfigKey() {
+   String deprecatedKey1 ="deprecated-key";
+   String deprecatedKey2 ="another-out_of-date_key";
+   String deprecatedKey3 ="yet-one-more";
+
+   String designatedKey1 ="newkey1";
+   String designatedKey2 ="newKey2";
+   String designatedKey3 ="newKey3";
+
+   String value1 = "value1";
+   String value2_designated = "designated-value2";
+   String value2_deprecated = "deprecated-value2";
+
+   // config contains only deprecated key 1, and for key 2 both 
deprecated and designated
+   Configuration cfg = new Configuration();
+   cfg.setString(deprecatedKey1, value1);
+   cfg.setString(deprecatedKey2, value2_deprecated);
+   cfg.setString(designatedKey2, value2_designated);
+
+   BootstrapTools.substituteDeprecatedConfigKey(cfg, 
deprecatedKey1, designatedKey1);
+   BootstrapTools.substituteDeprecatedConfigKey(cfg, 
deprecatedKey2, designatedKey2);
+   BootstrapTools.substituteDeprecatedConfigKey(cfg, 
deprecatedKey3, designatedKey3);
+
+   // value 1 should be set to designated
+   assertEquals(value1, cfg.getString(designatedKey1, null));
+
+   // value 2 should not have been set, since it had a value 
already
+   assertEquals(value2_designated, cfg.getString(designatedKey2, 
null));
+
+   // nothing should be in there for key 3
+   assertNull(cfg.getString(designatedKey3, null));
+   assertNull(cfg.getString(deprecatedKey3, null));
+   }
+
+   @Test
+   public void testSubstituteConfigKeyPrefix() {
+   String deprecatedPrefix1 ="deprecated-prefix";
+   String deprecatedPrefix2 ="-prefix-2";
+   String deprecatedPrefix3 ="prefix-3";
+
+   String designatedPrefix1 ="p1";
+   String designatedPrefix2 ="ppp";
+   String designatedPrefix3 ="zzz";
+
+   String depr1 = deprecatedPrefix1 + "var";
+   String depr2 = deprecatedPrefix2 + "env";
+   String depr3 = deprecatedPrefix2 + "x";
+
+   String desig1 = deprecatedPrefix1 + "var";
+   String desig2 = deprecatedPrefix2 + "env";
+   String desig3 = deprecatedPrefix2 + "x";
--- End diff --

Is it intended that you used the deprecatedPrefix for the designated keys?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206405#comment-15206405
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56990053
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ResourceManagerITCase {
--- End diff --

Extending the `TestLogger` will give use nice outputs which test case is 
currently being run in the logs.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206407#comment-15206407
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56990136
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ResourceManagerITCase {
+
+   private static ActorSystem system;
+
+   private static Configuration config = new Configuration();
+
+   @Before
+   public void setup() {
+   system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+   }
+
+   @After
+   public void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Tests whether the resource manager connects and reconciles existing 
task managers.
+*/
+   @Test
+   public void testResourceManagerReconciliation() {
+
+   new JavaTestKit(system){{
+   new Within(duration("10 seconds")) {
--- End diff --

indentation


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206377#comment-15206377
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56987194
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 ---
@@ -211,6 +238,13 @@ class LocalFlinkMiniCluster(
   JobManager.JOB_MANAGER_NAME
 }
   }
+  protected def getResourceManagerName(index: Int): String = {
+if(singleActorSystem) {
+  FlinkResourceManager.RESOURCE_MANAGER_NAME + "_" + (index + 1)
+} else {
+  FlinkResourceManager.RESOURCE_MANAGER_NAME
+}
+  }
--- End diff --

Corrected, but not part of this diff.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206352#comment-15206352
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56985677
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206344#comment-15206344
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56985225
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -928,11 +934,6 @@ class TaskManager(
   jm => context.unwatch(jm)
 }
 
-// de-register from the JobManager (faster detection of disconnect)
-currentJobManager foreach {
-  _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is 
disassociating"))
-}
--- End diff --

Also an artifact of the original approach. Reverting.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206335#comment-15206335
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56984100
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -592,19 +605,18 @@ class TaskManager(
 log.warn(s"Ignoring 'AcknowledgeRegistration' message from 
${jobManager.path} , " +
   s"because the TaskManager is already registered at 
${currentJobManager.orNull}")
   }
-}
-else {
+} else {
   // not yet connected, so let's associate with that JobManager
   try {
 associateWithJobManager(jobManager, id, blobPort)
   } catch {
 case t: Throwable =>
   killTaskManagerFatal(
-"Unable to start TaskManager components after registering 
at JobManager", t)
+"Unable to start TaskManager components and associate with 
the JobManager ", t)
--- End diff --

Not necessary. Removing whitespace.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206333#comment-15206333
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56983997
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -632,34 +643,32 @@ class TaskManager(
 
   case RefuseRegistration(reason) =>
 if (currentJobManager.isEmpty) {
-  log.error(s"The registration at JobManager $jobManagerAkkaURL 
was refused, " +
+  log.error(s"The registration at ResourceManager 
$jobManagerAkkaURL was refused, " +
--- End diff --

This is an artifact of refactoring the original approach that I followed 
where the TaskManager would register at the ResourceManager. Reverting.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206329#comment-15206329
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56983799
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -592,19 +605,18 @@ class TaskManager(
 log.warn(s"Ignoring 'AcknowledgeRegistration' message from 
${jobManager.path} , " +
   s"because the TaskManager is already registered at 
${currentJobManager.orNull}")
   }
-}
-else {
+} else {
   // not yet connected, so let's associate with that JobManager
   try {
 associateWithJobManager(jobManager, id, blobPort)
   } catch {
 case t: Throwable =>
   killTaskManagerFatal(
-"Unable to start TaskManager components after registering 
at JobManager", t)
+"Unable to start TaskManager components and associate with 
the JobManager ", t)
   }
 }
 
-  // we are already registered at that specific JobManager - duplicate 
answer, rare cases
+  // we are already registered at this ResourceManager - duplicate 
answer, rare cases
   case AlreadyRegistered(id, blobPort) =>
 val jobManager = sender()
--- End diff --

This is an artifact of refactoring the original approach that I followed 
where the TaskManager would register at the ResourceManager. Reverting.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206330#comment-15206330
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56983829
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -613,12 +625,11 @@ class TaskManager(
 log.debug("Ignoring duplicate registration acknowledgement.")
   } else {
 log.warn(s"Received 'AlreadyRegistered' message from " +
-  s"JobManager ${jobManager.path}, even through TaskManager is 
currently " +
+  s"ResourceManager ${jobManager.path}, even through 
TaskManager is currently " +
--- End diff --

This is an artifact of refactoring the original approach that I followed 
where the TaskManager would register at the ResourceManager. Reverting.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206188#comment-15206188
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56969482
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1823,8 +1836,8 @@ object TaskManager {
* @param taskManagerUrl The akka URL of the JobManager.
* @param system The local actor system that should perform the lookup.
* @param timeout The maximum time to wait until the lookup fails.
-   * @throws java.io.IOException Thrown, if the lookup fails.
-   * @return The ActorRef to the TaskManager
+* @throws java.io.IOException Thrown, if the lookup fails.
+* @return The ActorRef to the TaskManager
--- End diff --

formatting regression


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206189#comment-15206189
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56969518
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -2066,8 +2079,7 @@ object TaskManager {
* @param parameter The parameter value. Will be shown in the exception 
message.
* @param name The name of the config parameter. Will be shown in the 
exception message.
* @param errorMessage The optional custom error message to append to 
the exception message.
-   *
-   * @throws IllegalConfigurationException Thrown if the condition is 
violated.
+* @throws IllegalConfigurationException Thrown if the condition is 
violated.
--- End diff --

Wrong indentation


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206187#comment-15206187
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56969207
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -928,11 +934,6 @@ class TaskManager(
   jm => context.unwatch(jm)
 }
 
-// de-register from the JobManager (faster detection of disconnect)
-currentJobManager foreach {
-  _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is 
disassociating"))
-}
--- End diff --

Why don't we need this anymore?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206181#comment-15206181
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56968776
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -632,34 +643,32 @@ class TaskManager(
 
   case RefuseRegistration(reason) =>
 if (currentJobManager.isEmpty) {
-  log.error(s"The registration at JobManager $jobManagerAkkaURL 
was refused, " +
+  log.error(s"The registration at ResourceManager 
$jobManagerAkkaURL was refused, " +
--- End diff --

RM vs. JM conflict?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206178#comment-15206178
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56968677
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -613,12 +625,11 @@ class TaskManager(
 log.debug("Ignoring duplicate registration acknowledgement.")
   } else {
 log.warn(s"Received 'AlreadyRegistered' message from " +
-  s"JobManager ${jobManager.path}, even through TaskManager is 
currently " +
+  s"ResourceManager ${jobManager.path}, even through 
TaskManager is currently " +
--- End diff --

Does `jobManager` contain the `ActorRef` of the RM or the JM?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206177#comment-15206177
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56968619
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -592,19 +605,18 @@ class TaskManager(
 log.warn(s"Ignoring 'AcknowledgeRegistration' message from 
${jobManager.path} , " +
   s"because the TaskManager is already registered at 
${currentJobManager.orNull}")
   }
-}
-else {
+} else {
   // not yet connected, so let's associate with that JobManager
   try {
 associateWithJobManager(jobManager, id, blobPort)
   } catch {
 case t: Throwable =>
   killTaskManagerFatal(
-"Unable to start TaskManager components after registering 
at JobManager", t)
+"Unable to start TaskManager components and associate with 
the JobManager ", t)
   }
 }
 
-  // we are already registered at that specific JobManager - duplicate 
answer, rare cases
+  // we are already registered at this ResourceManager - duplicate 
answer, rare cases
   case AlreadyRegistered(id, blobPort) =>
 val jobManager = sender()
--- End diff --

Is this the JM or the RM address which we store here in `jobManager`?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206176#comment-15206176
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56968482
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -592,19 +605,18 @@ class TaskManager(
 log.warn(s"Ignoring 'AcknowledgeRegistration' message from 
${jobManager.path} , " +
   s"because the TaskManager is already registered at 
${currentJobManager.orNull}")
   }
-}
-else {
+} else {
   // not yet connected, so let's associate with that JobManager
   try {
 associateWithJobManager(jobManager, id, blobPort)
   } catch {
 case t: Throwable =>
   killTaskManagerFatal(
-"Unable to start TaskManager components after registering 
at JobManager", t)
+"Unable to start TaskManager components and associate with 
the JobManager ", t)
--- End diff --

Why the whitespace at the end of the error message?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206167#comment-15206167
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56967954
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 ---
@@ -211,6 +238,13 @@ class LocalFlinkMiniCluster(
   JobManager.JOB_MANAGER_NAME
 }
   }
+  protected def getResourceManagerName(index: Int): String = {
--- End diff --

line break missing before this line


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206168#comment-15206168
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56968005
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 ---
@@ -211,6 +238,13 @@ class LocalFlinkMiniCluster(
   JobManager.JOB_MANAGER_NAME
 }
   }
+  protected def getResourceManagerName(index: Int): String = {
+if(singleActorSystem) {
+  FlinkResourceManager.RESOURCE_MANAGER_NAME + "_" + (index + 1)
+} else {
+  FlinkResourceManager.RESOURCE_MANAGER_NAME
+}
+  }
--- End diff --

After this line there is also a line break missing


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206133#comment-15206133
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56964129
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206128#comment-15206128
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56963220
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206091#comment-15206091
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56960595
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206078#comment-15206078
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56959658
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -87,7 +91,7 @@ object RegistrationMessages {
 extends RegistrationMessage
 
   /**
-   * Denotes the unsuccessful registration of a task manager at the job 
manager. This is the
+   * Denotes the unsuccessful registration of a task manager at the 
JobManager. This is the
--- End diff --

I've made it consistent.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204869#comment-15204869
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56875016
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204823#comment-15204823
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56872406
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -36,7 +38,7 @@ object RegistrationMessages {
   /**
* Triggers the TaskManager to attempt a registration at the JobManager.
*
-   * @param jobManagerURL Akka URL to the JobManager
+   * @param jobManagerURL Akka URL to the JobManager to ask for the 
JobManager
--- End diff --

Reverted the comment.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204799#comment-15204799
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56870497
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import java.util.Collection;
+import java.util.UUID;
+
+/**
+ * A standalone implementation of the resource manager. Used when the 
system is started in
+ * standalone mode (via scripts), rather than via a resource framework 
like YARN or Mesos.
+ */
+public class StandaloneResourceManager extends 
FlinkResourceManager {
+   
+
+   public StandaloneResourceManager(Configuration flinkConfig, 
LeaderRetrievalService leaderRetriever) {
+   super(0, flinkConfig, leaderRetriever);
+   }
+
+   // 

+   //  Framework specific behavior
+   // 

+
+
+   @Override
+   protected void newJobManagerLeaderAvailable(String leaderAddress, UUID 
leaderSessionID) {
+   super.newJobManagerLeaderAvailable(leaderAddress, 
leaderSessionID);
+   }
+
+   @Override
+   protected void initialize() throws Exception {
+   // nothing to initialize
+   }
+
+   @Override
+   protected void leaderUpdated() {
+   // nothing to update
+   }
+
+   @Override
+   protected void shutdownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
+   }
+
+   @Override
+   protected void fatalError(String message, Throwable error) {
+   log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error);
+   LOG.error("Shutting down process");
--- End diff --

Yep, will change that. There is the Actor logger and the RM logger. No need 
to have both.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204787#comment-15204787
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56869299
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala 
---
@@ -29,22 +29,22 @@ object Messages {
   case object Acknowledge
 
   /**
-   * Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
-   *
-   * The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
-   * loss more quickly.
-   *
-   * The JobManager may send this message to its TaskManagers to let them 
clean up their
-   * tasks that depend on the JobManager and go into a clean state.
-   *
-   * @param reason The reason for disconnecting, to be displayed in log 
and error messages.
-   */
-  case class Disconnect(reason: String) extends RequiresLeaderSessionID
+* Accessor for the case object instance, to simplify Java 
interoperability.
+*
+* @return The Acknowledge case object instance.
+*/
+  def getAcknowledge(): Acknowledge.type = Acknowledge
 
   /**
-   * Accessor for the case object instance, to simplify Java 
interoperability.
-   *
-   * @return The Acknowledge case object instance.
-   */
-  def getAcknowledge(): Acknowledge.type = Acknowledge
+* Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
+*
+* The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
+* loss more quickly.
+*
+* The JobManager may send this message to its TaskManagers to let them 
clean up their
+* tasks that depend on the JobManager and go into a clean state.
+*
+* @param reason The reason for disconnecting, to be displayed in log 
and error messages.
+*/
+  case class Disconnect(reason: String) extends RequiresLeaderSessionID
--- End diff --

You're right, there is no need to change the order here. I'll revert the 
changes.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204782#comment-15204782
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56869070
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
+
+  // trigger shutdown
+  shutdown()
+
+case None =>
+  // retry
+  context.system.scheduler.scheduleOnce(
+2 seconds, self, decorateMessage(msg)
+  )(context.dispatcher)
--- End diff --

You're right. There should be an upper bound for the number of retries. If 
there was never a RM, then is behaves the same.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204711#comment-15204711
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56863532
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -36,7 +38,7 @@ object RegistrationMessages {
   /**
* Triggers the TaskManager to attempt a registration at the JobManager.
*
-   * @param jobManagerURL Akka URL to the JobManager
+   * @param jobManagerURL Akka URL to the JobManager to ask for the 
JobManager
--- End diff --

For me it is the other way around. Maybe change this to 
>Akka URL to the JobManager to ask for the JobManager actor

?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204706#comment-15204706
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56863204
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204672#comment-15204672
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56860338
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatAcknowledgement.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * This message is teh successful response to a heartbeat. 
+ */
+public class HeartbeatAcknowledgement implements RequiresLeaderSessionID, 
java.io.Serializable {
--- End diff --

You're looking at an old commit. I already removed this class but didn't 
want to force-push.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204671#comment-15204671
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56860326
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneMode.java
 ---
@@ -16,24 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.execution;
+package org.apache.flink.runtime.clusterframework.standalone;
 
-public interface ExecutionObserver {
+/**
+ * The startup mode for the standalone setup,
+ */
+public enum StandaloneMode {
--- End diff --

You're looking at an old commit. I already removed this class but didn't 
want to force-push.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204663#comment-15204663
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56860123
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
--- End diff --

You're right that the message can be lost. Apart from the leader election 
service there is currently no mechanism to detect a lost connection from RM to 
the JM.

The above code is part of a special case where the RM doesn't reply to a TM 
registration and the JM decides to disconnect the RM. I suppose we should keep 
retrying to send the TriggerRegistrationAtJobManager until we receive a 
registration of a RM.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204629#comment-15204629
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56859466
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatMessage.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+import java.io.Serializable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Heartbeat message send by a standalone TaskManager to the resource 
master.
+ */
+public class HeartbeatMessage implements RequiresLeaderSessionID, 
Serializable {
--- End diff --

You're looking at an old commit. I already removed this class but didn't 
want to force-push.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204625#comment-15204625
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56859173
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.util.AbstractID;
+
+import java.io.Serializable;
+
+/**
+ * Class for Resource Ids assigned at the FlinkResourceManager.
+ */
+public class ResourceID implements Serializable {
--- End diff --

In standalone mode, this is just a random String but in Yarn mode, this is 
the container id assigned at the framework. Thus, not random.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204586#comment-15204586
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56856463
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
+
+  // trigger shutdown
+  shutdown()
+
+case None =>
+  // retry
+  context.system.scheduler.scheduleOnce(
+2 seconds, self, decorateMessage(msg)
+  )(context.dispatcher)
--- End diff --

What if there was never a RM started? Would that make a difference?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204596#comment-15204596
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56857119
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -87,7 +91,7 @@ object RegistrationMessages {
 extends RegistrationMessage
 
   /**
-   * Denotes the unsuccessful registration of a task manager at the job 
manager. This is the
+   * Denotes the unsuccessful registration of a task manager at the 
JobManager. This is the
--- End diff --

This change and the change in line 74 don't seem to be consistent.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204594#comment-15204594
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56856958
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -36,7 +38,7 @@ object RegistrationMessages {
   /**
* Triggers the TaskManager to attempt a registration at the JobManager.
*
-   * @param jobManagerURL Akka URL to the JobManager
+   * @param jobManagerURL Akka URL to the JobManager to ask for the 
JobManager
--- End diff --

I don't understand the comment here. Admittedly, before it was not much 
better.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204591#comment-15204591
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56856717
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala 
---
@@ -29,22 +29,22 @@ object Messages {
   case object Acknowledge
 
   /**
-   * Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
-   *
-   * The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
-   * loss more quickly.
-   *
-   * The JobManager may send this message to its TaskManagers to let them 
clean up their
-   * tasks that depend on the JobManager and go into a clean state.
-   *
-   * @param reason The reason for disconnecting, to be displayed in log 
and error messages.
-   */
-  case class Disconnect(reason: String) extends RequiresLeaderSessionID
+* Accessor for the case object instance, to simplify Java 
interoperability.
+*
+* @return The Acknowledge case object instance.
+*/
+  def getAcknowledge(): Acknowledge.type = Acknowledge
 
   /**
-   * Accessor for the case object instance, to simplify Java 
interoperability.
-   *
-   * @return The Acknowledge case object instance.
-   */
-  def getAcknowledge(): Acknowledge.type = Acknowledge
+* Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
+*
+* The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
+* loss more quickly.
+*
+* The JobManager may send this message to its TaskManagers to let them 
clean up their
+* tasks that depend on the JobManager and go into a clean state.
+*
+* @param reason The reason for disconnecting, to be displayed in log 
and error messages.
+*/
+  case class Disconnect(reason: String) extends RequiresLeaderSessionID
--- End diff --

I don't see the reason for these changes here.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204584#comment-15204584
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56855904
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
+
+  // trigger shutdown
+  shutdown()
+
+case None =>
+  // retry
+  context.system.scheduler.scheduleOnce(
+2 seconds, self, decorateMessage(msg)
+  )(context.dispatcher)
--- End diff --

What if we will never establish a connection to the RM again for some 
reason? Wouldn't that mean that we will never shutdown the JM?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204579#comment-15204579
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56855460
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204578#comment-15204578
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56855273
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
--- End diff --

What happens if this message never reaches the ResourceManager? Is there a 
mean that the RM can detect that it lost connection to the JM?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204567#comment-15204567
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56854489
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneMode.java
 ---
@@ -16,24 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.execution;
+package org.apache.flink.runtime.clusterframework.standalone;
 
-public interface ExecutionObserver {
+/**
+ * The startup mode for the standalone setup,
+ */
+public enum StandaloneMode {
--- End diff --

Class is not used


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204564#comment-15204564
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56854344
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatAcknowledgement.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * This message is teh successful response to a heartbeat. 
+ */
+public class HeartbeatAcknowledgement implements RequiresLeaderSessionID, 
java.io.Serializable {
--- End diff --

Class is not used


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204566#comment-15204566
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56854368
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatMessage.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+import java.io.Serializable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Heartbeat message send by a standalone TaskManager to the resource 
master.
+ */
+public class HeartbeatMessage implements RequiresLeaderSessionID, 
Serializable {
--- End diff --

Class is not used


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204555#comment-15204555
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56853621
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
--- End diff --

Maybe rename `get` to `getInstance`.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204550#comment-15204550
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56853405
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
--- End diff --

Formatting regression


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204506#comment-15204506
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56847510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
 ---
@@ -21,7 +21,7 @@
 /**
  * Interface for message decorators
  */
-public interface MessageDecorator {
+public interface MessageDecorator extends java.io.Serializable {
--- End diff --

Good catch :-)


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204491#comment-15204491
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56845833
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.util.AbstractID;
+
+import java.io.Serializable;
+
+/**
+ * Class for Resource Ids assigned at the FlinkResourceManager.
+ */
+public class ResourceID implements Serializable {
--- End diff --

Why don't we use `AbstractID` here?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204476#comment-15204476
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56844658
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import java.util.Collection;
+import java.util.UUID;
+
+/**
+ * A standalone implementation of the resource manager. Used when the 
system is started in
+ * standalone mode (via scripts), rather than via a resource framework 
like YARN or Mesos.
+ */
+public class StandaloneResourceManager extends 
FlinkResourceManager {
+   
+
+   public StandaloneResourceManager(Configuration flinkConfig, 
LeaderRetrievalService leaderRetriever) {
+   super(0, flinkConfig, leaderRetriever);
+   }
+
+   // 

+   //  Framework specific behavior
+   // 

+
+
+   @Override
+   protected void newJobManagerLeaderAvailable(String leaderAddress, UUID 
leaderSessionID) {
+   super.newJobManagerLeaderAvailable(leaderAddress, 
leaderSessionID);
+   }
+
+   @Override
+   protected void initialize() throws Exception {
+   // nothing to initialize
+   }
+
+   @Override
+   protected void leaderUpdated() {
+   // nothing to update
+   }
+
+   @Override
+   protected void shutdownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
+   }
+
+   @Override
+   protected void fatalError(String message, Throwable error) {
+   log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error);
+   LOG.error("Shutting down process");
--- End diff --

Why do we use two different logger variables here?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199663#comment-15199663
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56518228
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 ---
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import 
org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import 
org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
+import 
org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
+ * Worker allocation steps
+ *
+ * 
+ * The resource manager decides to request more workers. This can 
happen in order
+ * to fill the initial pool, or as a result of the JobManager 
requesting more workers.
+ *
+ * The resource master calls {@link #requestNewWorkers(int)}, 
which triggers requests
+ * for more containers. After that, the {@link 
#getNumWorkerRequestsPending()}
+ * should reflect the pending 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199284#comment-15199284
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56484044
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 ---
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import 
org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import 
org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
+import 
org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
+ * Worker allocation steps
+ *
+ * 
+ * The resource manager decides to request more workers. This can 
happen in order
+ * to fill the initial pool, or as a result of the JobManager 
requesting more workers.
+ *
+ * The resource master calls {@link #requestNewWorkers(int)}, 
which triggers requests
+ * for more containers. After that, the {@link 
#getNumWorkerRequestsPending()}
+ * should reflect the pending 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199243#comment-15199243
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56479617
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/SetWorkerPoolSize.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.messages;
+
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * Message sent to the resource master actor to adjust the designated 
number of
+ * workers it maintains.
+ */
+public class SetWorkerPoolSize implements RequiresLeaderSessionID, 
java.io.Serializable{
+
+   private static final long serialVersionUID = -335911350781207609L;
+   
+   private final int numberOfWorkers;
+
+   public SetWorkerPoolSize(int numberOfWorkers) {
+   if (numberOfWorkers < 0) {
+   throw new IllegalArgumentException();
--- End diff --

Added


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199289#comment-15199289
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56484759
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 ---
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import 
org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import 
org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
+import 
org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
+ * Worker allocation steps
+ *
+ * 
+ * The resource manager decides to request more workers. This can 
happen in order
+ * to fill the initial pool, or as a result of the JobManager 
requesting more workers.
+ *
+ * The resource master calls {@link #requestNewWorkers(int)}, 
which triggers requests
+ * for more containers. After that, the {@link 
#getNumWorkerRequestsPending()}
+ * should reflect the pending requests.
 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197686#comment-15197686
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56373341
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/SetWorkerPoolSize.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.messages;
+
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * Message sent to the resource master actor to adjust the designated 
number of
+ * workers it maintains.
+ */
+public class SetWorkerPoolSize implements RequiresLeaderSessionID, 
java.io.Serializable{
+
+   private static final long serialVersionUID = -335911350781207609L;
+   
+   private final int numberOfWorkers;
+
+   public SetWorkerPoolSize(int numberOfWorkers) {
+   if (numberOfWorkers < 0) {
+   throw new IllegalArgumentException();
--- End diff --

Exception message could be helpful


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197708#comment-15197708
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56375125
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatus.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.messages;
+
+/**
+ * This message signals the resource master to check how many TaskManagers 
are 
+ * desired, how many are available, and to trigger adjustments if needed.
+ */
+public class GetClusterStatus implements java.io.Serializable {
--- End diff --

Class is not used according to IntelliJ.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199239#comment-15199239
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56479487
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 ---
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import 
org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import 
org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
+import 
org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
+ * Worker allocation steps
+ *
+ * 
+ * The resource manager decides to request more workers. This can 
happen in order
+ * to fill the initial pool, or as a result of the JobManager 
requesting more workers.
+ *
+ * The resource master calls {@link #requestNewWorkers(int)}, 
which triggers requests
+ * for more containers. After that, the {@link 
#getNumWorkerRequestsPending()}
+ * should reflect the pending requests.
 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199283#comment-15199283
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56483912
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class describes the basic parameters for launching a TaskManager 
process.
+ */
+public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
+
+   private static final long serialVersionUID = -3096987654278064670L;
+   
+   /** Total container memory, in bytes */
+   private final long totalContainerMemoryMB;
+
+   /** Heap size to be used for the Java process */
+   private final long taskManagerHeapSizeMB;
+
+   /** Direct memory limit for the Java process */
+   private final long taskManagerDirectMemoryLimitMB;
+
+   /** The number of slots per TaskManager */
+   private final int numSlots;
+   
+   /** Environment variables to add to the Java process */
+   private final HashMap taskManagerEnv;
+
+   
+   public ContaineredTaskManagerParameters(
+   long totalContainerMemoryMB, long taskManagerHeapSizeMB,
--- End diff --

I think I would place every parameter in one line or on every line multiple 
parameters. I don't think that multiple parameters on the first line followed 
by a single parameter on each following line is the most intuitive pattern.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199245#comment-15199245
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56479659
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ShutdownClusterAfterJob.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.messages;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * Message sent to the cluster framework master to signal it that the 
cluster
+ * should be shut down upon completion of a certain job.
+ */
+public class ShutdownClusterAfterJob implements java.io.Serializable {
--- End diff --

This should replace the Yarn shutdown logic. I've integrated it now to 
replace `StopAMAfterJob`.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199246#comment-15199246
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56479664
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatus.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.messages;
+
+/**
+ * This message signals the resource master to check how many TaskManagers 
are 
+ * desired, how many are available, and to trigger adjustments if needed.
+ */
+public class GetClusterStatus implements java.io.Serializable {
--- End diff --

Same here. General mechanism. Implemented now to replace 
`PollYarnClusterStatus`.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199240#comment-15199240
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56479509
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 ---
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import 
org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import 
org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
+import 
org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
+ * Worker allocation steps
+ *
+ * 
+ * The resource manager decides to request more workers. This can 
happen in order
+ * to fill the initial pool, or as a result of the JobManager 
requesting more workers.
+ *
+ * The resource master calls {@link #requestNewWorkers(int)}, 
which triggers requests
+ * for more containers. After that, the {@link 
#getNumWorkerRequestsPending()}
+ * should reflect the pending requests.
 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197682#comment-15197682
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56372798
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListenerSuccessful.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.messages;
+
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * This message signals to the application client that the registration 
was successful.
+ */
+public class RegisterInfoMessageListenerSuccessful implements 
RequiresLeaderSessionID, java.io.Serializable {
+
+   private static final long serialVersionUID = 7808628311617273755L;
+
+   /** The singleton instance */
+   private static final RegisterInfoMessageListenerSuccessful INSTANCE = 
new RegisterInfoMessageListenerSuccessful();
+
+   /**
+* Gets the singleton instance.
+* @return The singleton instance.
+*/
+   public static RegisterInfoMessageListenerSuccessful get() {
+   return INSTANCE;
+   }
+
+   // 

+
+   /** Private constructor to prevent instantiation */
+   private RegisterInfoMessageListenerSuccessful() {}
+
+   // 

+
+   @Override
+   public boolean equals(Object obj) {
+   return obj != null && obj.getClass() == 
RegisterInfoMessageListenerSuccessful.class;
+   }
+
+   @Override
+   public int hashCode() {
+   return 2018741654;
--- End diff --

This class and the `RegisterInfoMessageListener` message have the same hash 
value.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197622#comment-15197622
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56366895
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 ---
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import 
org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import 
org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
+import 
org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
+ * Worker allocation steps
+ *
+ * 
+ * The resource manager decides to request more workers. This can 
happen in order
+ * to fill the initial pool, or as a result of the JobManager 
requesting more workers.
+ *
+ * The resource master calls {@link #requestNewWorkers(int)}, 
which triggers requests
+ * for more containers. After that, the {@link 
#getNumWorkerRequestsPending()}
+ * should reflect the pending 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199247#comment-15199247
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56479675
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.messages;
+
+import java.io.Serializable;
+
+public class GetClusterStatusResponse implements Serializable {
--- End diff --

Thanks. Also integrated and replaces `FlinkYarnClusterStatus`. 


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197651#comment-15197651
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56368934
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 ---
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import 
org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import 
org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
+import 
org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
+ * Worker allocation steps
+ *
+ * 
+ * The resource manager decides to request more workers. This can 
happen in order
+ * to fill the initial pool, or as a result of the JobManager 
requesting more workers.
+ *
+ * The resource master calls {@link #requestNewWorkers(int)}, 
which triggers requests
+ * for more containers. After that, the {@link 
#getNumWorkerRequestsPending()}
+ * should reflect the pending 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197733#comment-15197733
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56377258
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatAcknowledgement.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * This message is teh successful response to a heartbeat. 
--- End diff --

typo: the


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197684#comment-15197684
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56373029
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManagerSuccessful.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.messages;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Message that informs the resource manager that the JobManager accepted 
its registration.
+ * Carries information about the JobManager, and about the TaskManagers 
that the JobManager
+ * still has registered.
+ */
+public class RegisterResourceManagerSuccessful implements 
RequiresLeaderSessionID, Serializable {
+
+   private static final long serialVersionUID = 817011779310941753L;
+
+   /** The JobManager which we registered with. */
+   private final ActorRef jobManager;
+
+   /** The list of registered TaskManagers that the JobManager currently 
knows */
+   private final Collection currentlyRegisteredTaskManagers;
+
+
--- End diff --

Two line breaks


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199278#comment-15199278
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56483710
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import com.typesafe.config.Config;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.BindException;
+import java.net.ServerSocket;
+import java.util.Iterator;
+
+/**
+ * Tools for starting JobManager and TaskManager processes, including the
+ * Actor Systems used to run the JobManager and TaskManager actors.
+ */
+public class BootstrapTools {
+
+   /**
+* Starts an ActorSystem with the given configuration listening at the 
address/ports.
+* @param configuration The Flink configuration
+* @param listeningAddress The address to listen at.
+* @param portRangeDefinition The port range to choose a port from.
+* @param logger The logger to output log information.
+* @return The ActorSystem which has been started
+* @throws Exception
+*/
+   public static ActorSystem startActorSystem(Configuration configuration,
+   
String listeningAddress,
+   
String portRangeDefinition,
+   
Logger logger) throws Exception {
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(portRangeDefinition);
+   } catch (Exception e) {
+   throw new IllegalArgumentException("Invalid port range 
definition: " + portRangeDefinition);
+   }
+
+   while (portsIterator.hasNext()) {
+   // first, we check if the port is available by opening 
a socket
+   // if the actor system fails to start on the port, we 
try further
+   ServerSocket availableSocket = 
NetUtils.createSocketFromPorts(
+   portsIterator,
+   new NetUtils.SocketFactory() {
+   @Override
+   public ServerSocket createSocket(int 
port) throws IOException {
+   return new ServerSocket(port);
+   }
+   });
+
+   int port;
+   if (availableSocket == null) {
+   throw new BindException("Unable to allocate 
further port in port range: " + portRangeDefinition);
+   } else {
+   port = 

  1   2   >