This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new e959e6d [FLINK-5542][yarn] Use YarnCluster vcores setting to do MaxVCore validation. e959e6d is described below commit e959e6d0cd42f0c5b21c0f03ce547f2025ac58d5 Author: xuewei.linxuewei <xuewei.linxue...@alibaba-inc.com> AuthorDate: Fri Sep 28 18:15:25 2018 +0800 [FLINK-5542][yarn] Use YarnCluster vcores setting to do MaxVCore validation. This closes #6775. --- .../flink/yarn/AbstractYarnClusterDescriptor.java | 27 ++++++++++++++-------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index c3ad9f7..c161e22 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -282,18 +282,27 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } // Check if we don't exceed YARN's maximum virtual cores. - // The number of cores can be configured in the config. - // If not configured, it is set to the number of task slots - int numYarnVcores = yarnConfiguration.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES); + // Fetch numYarnMaxVcores from all the RUNNING nodes via yarnClient + final int numYarnMaxVcores; + try { + numYarnMaxVcores = yarnClient.getNodeReports(NodeState.RUNNING) + .stream() + .mapToInt(report -> report.getCapability().getVirtualCores()) + .max() + .orElse(0); + } catch (Exception e) { + throw new YarnDeploymentException("Couldn't get cluster description, please check on the YarnConfiguration", e); + } + int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager()); // don't configure more than the maximum configured number of vcores - if (configuredVcores > numYarnVcores) { + if (configuredVcores > numYarnMaxVcores) { throw new IllegalConfigurationException( - String.format("The number of virtual cores per node were configured with %d" + - " but Yarn only has %d virtual cores available. Please note that the number" + - " of virtual cores is set to the number of task slots by default unless configured" + - " in the Flink config with '%s.'", - configuredVcores, numYarnVcores, YarnConfigOptions.VCORES.key())); + String.format("The number of requested virtual cores per node %d" + + " exceeds the maximum number of virtual cores %d available in the Yarn Cluster." + + " Please note that the number of virtual cores is set to the number of task slots by default" + + " unless configured in the Flink config with '%s.'", + configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key())); } // check if required Hadoop environment variables are set. If not, warn user