This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e959e6d  [FLINK-5542][yarn] Use YarnCluster vcores setting to do 
MaxVCore validation.
e959e6d is described below

commit e959e6d0cd42f0c5b21c0f03ce547f2025ac58d5
Author: xuewei.linxuewei <xuewei.linxue...@alibaba-inc.com>
AuthorDate: Fri Sep 28 18:15:25 2018 +0800

    [FLINK-5542][yarn] Use YarnCluster vcores setting to do MaxVCore validation.
    
    This closes #6775.
---
 .../flink/yarn/AbstractYarnClusterDescriptor.java  | 27 ++++++++++++++--------
 1 file changed, 18 insertions(+), 9 deletions(-)

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c3ad9f7..c161e22 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -282,18 +282,27 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                }
 
                // Check if we don't exceed YARN's maximum virtual cores.
-               // The number of cores can be configured in the config.
-               // If not configured, it is set to the number of task slots
-               int numYarnVcores = 
yarnConfiguration.getInt(YarnConfiguration.NM_VCORES, 
YarnConfiguration.DEFAULT_NM_VCORES);
+               // Fetch numYarnMaxVcores from all the RUNNING nodes via 
yarnClient
+               final int numYarnMaxVcores;
+               try {
+                       numYarnMaxVcores = 
yarnClient.getNodeReports(NodeState.RUNNING)
+                               .stream()
+                               .mapToInt(report -> 
report.getCapability().getVirtualCores())
+                               .max()
+                               .orElse(0);
+               } catch (Exception e) {
+                       throw new YarnDeploymentException("Couldn't get cluster 
description, please check on the YarnConfiguration", e);
+               }
+
                int configuredVcores = 
flinkConfiguration.getInteger(YarnConfigOptions.VCORES, 
clusterSpecification.getSlotsPerTaskManager());
                // don't configure more than the maximum configured number of 
vcores
-               if (configuredVcores > numYarnVcores) {
+               if (configuredVcores > numYarnMaxVcores) {
                        throw new IllegalConfigurationException(
-                               String.format("The number of virtual cores per 
node were configured with %d" +
-                                               " but Yarn only has %d virtual 
cores available. Please note that the number" +
-                                               " of virtual cores is set to 
the number of task slots by default unless configured" +
-                                               " in the Flink config with 
'%s.'",
-                                       configuredVcores, numYarnVcores, 
YarnConfigOptions.VCORES.key()));
+                               String.format("The number of requested virtual 
cores per node %d" +
+                                               " exceeds the maximum number of 
virtual cores %d available in the Yarn Cluster." +
+                                               " Please note that the number 
of virtual cores is set to the number of task slots by default" +
+                                               " unless configured in the 
Flink config with '%s.'",
+                                       configuredVcores, numYarnMaxVcores, 
YarnConfigOptions.VCORES.key()));
                }
 
                // check if required Hadoop environment variables are set. If 
not, warn user

Reply via email to