[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4271


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4271#discussion_r129532134
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws 
Exception {
taskManagerMemoryMb =  yarnMinAllocationMB;
}
 
-   // Create application via yarnClient
-   final YarnClientApplication yarnApplication = 
yarnClient.createApplication();
-   GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
-
-   Resource maxRes = appResponse.getMaximumResourceCapability();
final String note = "Please check the 
'yarn.scheduler.maximum-allocation-mb' and the 
'yarn.nodemanager.resource.memory-mb' configuration values\n";
-   if (jobManagerMemoryMb > maxRes.getMemory()) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
+   if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) 
{
throw new YarnDeploymentException("The cluster does not 
have the requested resources for the JobManager available!\n"
-   + "Maximum Memory: " + maxRes.getMemory() + "MB 
Requested: " + jobManagerMemoryMb + "MB. " + note);
+   + "Maximum Memory: " + 
maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + 
"MB. " + note);
}
 
-   if (taskManagerMemoryMb > maxRes.getMemory()) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
+   if (taskManagerMemoryMb > 
maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not 
have the requested resources for the TaskManagers available!\n"
-   + "Maximum Memory: " + maxRes.getMemory() + " 
Requested: " + taskManagerMemoryMb + "MB. " + note);
+   + "Maximum Memory: " + 
maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + 
"MB. " + note);
}
 
final String noteRsc = "\nThe Flink YARN client will try to 
allocate the YARN session, but maybe not all TaskManagers are " +
"connecting from the beginning because the resources 
are currently not available in the cluster. " +
"The allocation might take more time than usual because 
the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + 
taskManagerMemoryMb * taskManagerCount;
-   ClusterResourceDescription freeClusterMem;
-   try {
-   freeClusterMem = 
getCurrentFreeClusterResources(yarnClient);
-   } catch (YarnException | IOException e) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
-   throw new YarnDeploymentException("Could not retrieve 
information about free cluster resources.", e);
-   }
 
-   if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+
+   if (freeClusterResources.totalFreeMemory < totalMemoryRequired) 
{
LOG.warn("This YARN session requires " + 
totalMemoryRequired + "MB of memory in the cluster. "
-   + "There are currently only " + 
freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
+   + "There are currently only " + 
freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
 
}
-   if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
+   if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the 
TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
-   + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + noteRsc);
+   + "the largest possible YARN container: " + 
freeClusterResources.containerLimit + noteRsc);
}
-   if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
+   if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the 
JobManager (" + jobManagerMemoryMb + "MB) is more than "
-   + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + noteRsc);
+   + "the largest possible YARN container: " + 
freeClusterResources.containerLimit + noteRsc);
}
 
// 

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4271#discussion_r129531644
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---
@@ -631,12 +634,15 @@ protected static void sendOutput() {
protected static class Runner extends Thread {
private final String[] args;
private final int expectedReturnValue;
+   private final org.apache.flink.configuration.Configuration 
configuration;
+
private RunTypes type;
private FlinkYarnSessionCli yCli;
private Throwable runnerError;
 
-   public Runner(String[] args, RunTypes type, int 
expectedReturnValue) {
+   public Runner(String[] args, 
org.apache.flink.configuration.Configuration configuration, RunTypes type, int 
expectedReturnValue) {
--- End diff --

True. Then I think it is not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4271#discussion_r129526528
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws 
Exception {
taskManagerMemoryMb =  yarnMinAllocationMB;
}
 
-   // Create application via yarnClient
-   final YarnClientApplication yarnApplication = 
yarnClient.createApplication();
-   GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
-
-   Resource maxRes = appResponse.getMaximumResourceCapability();
final String note = "Please check the 
'yarn.scheduler.maximum-allocation-mb' and the 
'yarn.nodemanager.resource.memory-mb' configuration values\n";
-   if (jobManagerMemoryMb > maxRes.getMemory()) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
+   if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) 
{
throw new YarnDeploymentException("The cluster does not 
have the requested resources for the JobManager available!\n"
-   + "Maximum Memory: " + maxRes.getMemory() + "MB 
Requested: " + jobManagerMemoryMb + "MB. " + note);
+   + "Maximum Memory: " + 
maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + 
"MB. " + note);
}
 
-   if (taskManagerMemoryMb > maxRes.getMemory()) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
+   if (taskManagerMemoryMb > 
maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not 
have the requested resources for the TaskManagers available!\n"
-   + "Maximum Memory: " + maxRes.getMemory() + " 
Requested: " + taskManagerMemoryMb + "MB. " + note);
+   + "Maximum Memory: " + 
maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + 
"MB. " + note);
}
 
final String noteRsc = "\nThe Flink YARN client will try to 
allocate the YARN session, but maybe not all TaskManagers are " +
"connecting from the beginning because the resources 
are currently not available in the cluster. " +
"The allocation might take more time than usual because 
the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + 
taskManagerMemoryMb * taskManagerCount;
-   ClusterResourceDescription freeClusterMem;
-   try {
-   freeClusterMem = 
getCurrentFreeClusterResources(yarnClient);
-   } catch (YarnException | IOException e) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
-   throw new YarnDeploymentException("Could not retrieve 
information about free cluster resources.", e);
-   }
 
-   if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+
+   if (freeClusterResources.totalFreeMemory < totalMemoryRequired) 
{
LOG.warn("This YARN session requires " + 
totalMemoryRequired + "MB of memory in the cluster. "
-   + "There are currently only " + 
freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
+   + "There are currently only " + 
freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
 
}
-   if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
+   if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the 
TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
-   + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + noteRsc);
+   + "the largest possible YARN container: " + 
freeClusterResources.containerLimit + noteRsc);
}
-   if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
+   if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the 
JobManager (" + jobManagerMemoryMb + "MB) is more than "
-   + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + noteRsc);
+   + "the largest possible YARN container: " + 
freeClusterResources.containerLimit + noteRsc);
}
 
// 

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4271#discussion_r129526492
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+/**
+ * Description of the cluster to start by the {@link ClusterDescriptor}.
+ */
+public class ClusterSpecification {
+   private final int masterMemoryMB;
+   private final int taskManagerMemoryMB;
+   private final int numberTaskManagers;
+   private final int slotsPerTaskManager;
+
+   public ClusterSpecification(int masterMemoryMB, int 
taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) {
+   this.masterMemoryMB = masterMemoryMB;
+   this.taskManagerMemoryMB = taskManagerMemoryMB;
+   this.numberTaskManagers = numberTaskManagers;
+   this.slotsPerTaskManager = slotsPerTaskManager;
+   }
+
+   public int getMasterMemoryMB() {
+   return masterMemoryMB;
+   }
+
+   public int getTaskManagerMemoryMB() {
+   return taskManagerMemoryMB;
+   }
+
+   public int getNumberTaskManagers() {
+   return numberTaskManagers;
+   }
+
+   public int getSlotsPerTaskManager() {
+   return slotsPerTaskManager;
+   }
+
+   @Override
+   public String toString() {
+   return "ClusterSpecification{" +
+   "masterMemoryMB=" + masterMemoryMB +
+   ", taskManagerMemoryMB=" + taskManagerMemoryMB +
+   ", numberTaskManagers=" + numberTaskManagers +
+   ", slotsPerTaskManager=" + slotsPerTaskManager +
+   '}';
+   }
+
+   public static ClusterSpecification fromConfiguration(Configuration 
configuration) {
+   int slots = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+
+   int jobManagerMemoryMb = 
configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
+   int taskManagerMemoryMb = 
configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+
+
--- End diff --

Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4271#discussion_r129526011
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+/**
+ * Description of the cluster to start by the {@link ClusterDescriptor}.
+ */
+public class ClusterSpecification {
+   private final int masterMemoryMB;
+   private final int taskManagerMemoryMB;
+   private final int numberTaskManagers;
+   private final int slotsPerTaskManager;
+
+   public ClusterSpecification(int masterMemoryMB, int 
taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) {
--- End diff --

Good point. I'll add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4271#discussion_r125944216
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws 
Exception {
taskManagerMemoryMb =  yarnMinAllocationMB;
}
 
-   // Create application via yarnClient
-   final YarnClientApplication yarnApplication = 
yarnClient.createApplication();
-   GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
-
-   Resource maxRes = appResponse.getMaximumResourceCapability();
final String note = "Please check the 
'yarn.scheduler.maximum-allocation-mb' and the 
'yarn.nodemanager.resource.memory-mb' configuration values\n";
-   if (jobManagerMemoryMb > maxRes.getMemory()) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
+   if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) 
{
throw new YarnDeploymentException("The cluster does not 
have the requested resources for the JobManager available!\n"
-   + "Maximum Memory: " + maxRes.getMemory() + "MB 
Requested: " + jobManagerMemoryMb + "MB. " + note);
+   + "Maximum Memory: " + 
maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + 
"MB. " + note);
}
 
-   if (taskManagerMemoryMb > maxRes.getMemory()) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
+   if (taskManagerMemoryMb > 
maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not 
have the requested resources for the TaskManagers available!\n"
-   + "Maximum Memory: " + maxRes.getMemory() + " 
Requested: " + taskManagerMemoryMb + "MB. " + note);
+   + "Maximum Memory: " + 
maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + 
"MB. " + note);
}
 
final String noteRsc = "\nThe Flink YARN client will try to 
allocate the YARN session, but maybe not all TaskManagers are " +
"connecting from the beginning because the resources 
are currently not available in the cluster. " +
"The allocation might take more time than usual because 
the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + 
taskManagerMemoryMb * taskManagerCount;
-   ClusterResourceDescription freeClusterMem;
-   try {
-   freeClusterMem = 
getCurrentFreeClusterResources(yarnClient);
-   } catch (YarnException | IOException e) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
-   throw new YarnDeploymentException("Could not retrieve 
information about free cluster resources.", e);
-   }
 
-   if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+
+   if (freeClusterResources.totalFreeMemory < totalMemoryRequired) 
{
LOG.warn("This YARN session requires " + 
totalMemoryRequired + "MB of memory in the cluster. "
-   + "There are currently only " + 
freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
+   + "There are currently only " + 
freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
 
}
-   if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
+   if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the 
TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
-   + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + noteRsc);
+   + "the largest possible YARN container: " + 
freeClusterResources.containerLimit + noteRsc);
}
-   if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
+   if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the 
JobManager (" + jobManagerMemoryMb + "MB) is more than "
-   + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + noteRsc);
+   + "the largest possible YARN container: " + 
freeClusterResources.containerLimit + noteRsc);
}
 
// 

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4271#discussion_r125943654
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---
@@ -631,12 +634,15 @@ protected static void sendOutput() {
protected static class Runner extends Thread {
private final String[] args;
private final int expectedReturnValue;
+   private final org.apache.flink.configuration.Configuration 
configuration;
+
private RunTypes type;
private FlinkYarnSessionCli yCli;
private Throwable runnerError;
 
-   public Runner(String[] args, RunTypes type, int 
expectedReturnValue) {
+   public Runner(String[] args, 
org.apache.flink.configuration.Configuration configuration, RunTypes type, int 
expectedReturnValue) {
--- End diff --

what is the configuration argument for? It doesn't appear to be used 
anywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4271#discussion_r125944668
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws 
Exception {
taskManagerMemoryMb =  yarnMinAllocationMB;
}
 
-   // Create application via yarnClient
-   final YarnClientApplication yarnApplication = 
yarnClient.createApplication();
-   GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
-
-   Resource maxRes = appResponse.getMaximumResourceCapability();
final String note = "Please check the 
'yarn.scheduler.maximum-allocation-mb' and the 
'yarn.nodemanager.resource.memory-mb' configuration values\n";
-   if (jobManagerMemoryMb > maxRes.getMemory()) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
+   if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) 
{
throw new YarnDeploymentException("The cluster does not 
have the requested resources for the JobManager available!\n"
-   + "Maximum Memory: " + maxRes.getMemory() + "MB 
Requested: " + jobManagerMemoryMb + "MB. " + note);
+   + "Maximum Memory: " + 
maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + 
"MB. " + note);
}
 
-   if (taskManagerMemoryMb > maxRes.getMemory()) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
+   if (taskManagerMemoryMb > 
maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not 
have the requested resources for the TaskManagers available!\n"
-   + "Maximum Memory: " + maxRes.getMemory() + " 
Requested: " + taskManagerMemoryMb + "MB. " + note);
+   + "Maximum Memory: " + 
maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + 
"MB. " + note);
}
 
final String noteRsc = "\nThe Flink YARN client will try to 
allocate the YARN session, but maybe not all TaskManagers are " +
"connecting from the beginning because the resources 
are currently not available in the cluster. " +
"The allocation might take more time than usual because 
the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + 
taskManagerMemoryMb * taskManagerCount;
-   ClusterResourceDescription freeClusterMem;
-   try {
-   freeClusterMem = 
getCurrentFreeClusterResources(yarnClient);
-   } catch (YarnException | IOException e) {
-   failSessionDuringDeployment(yarnClient, 
yarnApplication);
-   throw new YarnDeploymentException("Could not retrieve 
information about free cluster resources.", e);
-   }
 
-   if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+
+   if (freeClusterResources.totalFreeMemory < totalMemoryRequired) 
{
LOG.warn("This YARN session requires " + 
totalMemoryRequired + "MB of memory in the cluster. "
-   + "There are currently only " + 
freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
+   + "There are currently only " + 
freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
 
}
-   if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
+   if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the 
TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
-   + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + noteRsc);
+   + "the largest possible YARN container: " + 
freeClusterResources.containerLimit + noteRsc);
}
-   if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
+   if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the 
JobManager (" + jobManagerMemoryMb + "MB) is more than "
-   + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + noteRsc);
+   + "the largest possible YARN container: " + 
freeClusterResources.containerLimit + noteRsc);
}
 
// 

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4271#discussion_r125938365
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+/**
+ * Description of the cluster to start by the {@link ClusterDescriptor}.
+ */
+public class ClusterSpecification {
+   private final int masterMemoryMB;
+   private final int taskManagerMemoryMB;
+   private final int numberTaskManagers;
+   private final int slotsPerTaskManager;
+
+   public ClusterSpecification(int masterMemoryMB, int 
taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) {
--- End diff --

I would add a builder for this class since constructor calls look like this 
`new ClusterSpecification(1,2,3,4)` which is rather unreadable and easily leads 
to mistakes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4271#discussion_r125938448
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+/**
+ * Description of the cluster to start by the {@link ClusterDescriptor}.
+ */
+public class ClusterSpecification {
+   private final int masterMemoryMB;
+   private final int taskManagerMemoryMB;
+   private final int numberTaskManagers;
+   private final int slotsPerTaskManager;
+
+   public ClusterSpecification(int masterMemoryMB, int 
taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) {
+   this.masterMemoryMB = masterMemoryMB;
+   this.taskManagerMemoryMB = taskManagerMemoryMB;
+   this.numberTaskManagers = numberTaskManagers;
+   this.slotsPerTaskManager = slotsPerTaskManager;
+   }
+
+   public int getMasterMemoryMB() {
+   return masterMemoryMB;
+   }
+
+   public int getTaskManagerMemoryMB() {
+   return taskManagerMemoryMB;
+   }
+
+   public int getNumberTaskManagers() {
+   return numberTaskManagers;
+   }
+
+   public int getSlotsPerTaskManager() {
+   return slotsPerTaskManager;
+   }
+
+   @Override
+   public String toString() {
+   return "ClusterSpecification{" +
+   "masterMemoryMB=" + masterMemoryMB +
+   ", taskManagerMemoryMB=" + taskManagerMemoryMB +
+   ", numberTaskManagers=" + numberTaskManagers +
+   ", slotsPerTaskManager=" + slotsPerTaskManager +
+   '}';
+   }
+
+   public static ClusterSpecification fromConfiguration(Configuration 
configuration) {
+   int slots = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+
+   int jobManagerMemoryMb = 
configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
+   int taskManagerMemoryMb = 
configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+
+
--- End diff --

remove 2nd empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

2017-07-06 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4271

[FLINK-7113] Make ClusterDescriptor independent of cluster size

This PR is based on #4270.

In order to pull some of the cluster deployment logic out of the 
`AbstractYarnClusterDescriptor`, this PR introduces a `ClusterSpecification`. 
This specification contains information about the size of the cluster to be 
started. Instead of setting these values via setters directly on the 
`ClusterDescriptor`, we provide the information at deployment time via the 
`ClusterSpecification`.

The way I envision the `ClusterDescriptor` to work, is to only contain 
information which is necessary to talk to a resource manager, such as Mesos or 
Yarn. All Flink cluster specific information should be provided to the `deploy` 
methods. This also allows us better share code between different 
`ClusterDescriptor`, e.g. Mesos and Yarn.

This is only the start of a refactoring of the `ClusterDescriptor`. Much 
more information can be pulled out of the `AbstractYarnClusterDescriptor`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink refactorClusterDescriptor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4271.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4271


commit 9cfb9718d7b64e253aaf636dc17b8939e2592a98
Author: Till Rohrmann 
Date:   2017-07-06T07:33:18Z

[FLINK-7110] [client] Add per-job cluster deployment interface

commit 0143d7e33b7d99ce8371e455458b730ba2c13d06
Author: Till Rohrmann 
Date:   2017-07-06T12:00:21Z

[FLINK-7113] Make ClusterDescriptor independent of cluster size

The deploySession method now is given a ClusterSpecification which 
specifies the
size of the cluster which it is supposed to deploy.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---