Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4271#discussion_r125944668 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws Exception { taskManagerMemoryMb = yarnMinAllocationMB; } - // Create application via yarnClient - final YarnClientApplication yarnApplication = yarnClient.createApplication(); - GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse(); - - Resource maxRes = appResponse.getMaximumResourceCapability(); final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n"; - if (jobManagerMemoryMb > maxRes.getMemory()) { - failSessionDuringDeployment(yarnClient, yarnApplication); + if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) { throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n" - + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note); + + "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note); } - if (taskManagerMemoryMb > maxRes.getMemory()) { - failSessionDuringDeployment(yarnClient, yarnApplication); + if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) { throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n" - + "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note); + + "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note); } final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " + "connecting from the beginning because the resources are currently not available in the cluster. " + "The allocation might take more time than usual because the Flink YARN client needs to wait until " + "the resources become available."; int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount; - ClusterResourceDescription freeClusterMem; - try { - freeClusterMem = getCurrentFreeClusterResources(yarnClient); - } catch (YarnException | IOException e) { - failSessionDuringDeployment(yarnClient, yarnApplication); - throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e); - } - if (freeClusterMem.totalFreeMemory < totalMemoryRequired) { + + if (freeClusterResources.totalFreeMemory < totalMemoryRequired) { LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. " - + "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc); + + "There are currently only " + freeClusterResources.totalFreeMemory + "MB available." + noteRsc); } - if (taskManagerMemoryMb > freeClusterMem.containerLimit) { + if (taskManagerMemoryMb > freeClusterResources.containerLimit) { LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than " - + "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc); + + "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc); } - if (jobManagerMemoryMb > freeClusterMem.containerLimit) { + if (jobManagerMemoryMb > freeClusterResources.containerLimit) { LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than " - + "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc); + + "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc); } // ----------------- check if the requested containers fit into the cluster. - int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length); + int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length); // first, allocate the jobManager somewhere. if (!allocateResource(nmFree, jobManagerMemoryMb)) { LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " + "The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " + - Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc); + Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc); } // allocate TaskManagers for (int i = 0; i < taskManagerCount; i++) { if (!allocateResource(nmFree, taskManagerMemoryMb)) { LOG.warn("There is not enough memory available in the YARN cluster. " + "The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " + - "NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" + + "NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" + "After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " + "the following NodeManagers are available: " + Arrays.toString(nmFree) + noteRsc); } } - ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication); + return new ClusterSpecification( + jobManagerMemoryMb, + taskManagerMemoryMb, + clusterSpecification.getNumberTaskManagers(), + clusterSpecification.getSlotsPerTaskManager()); + + } + + protected void logClusterSpecification(ClusterSpecification clusterSpecification) { + LOG.info("Cluster specification: {}", clusterSpecification); + } + + /** + * This method will block until the ApplicationMaster/JobManager have been + * deployed on YARN. + */ + protected YarnClusterClient deployInternal(ClusterSpecification clusterSpecification) throws Exception { + + isReadyForDeployment(clusterSpecification); + + final YarnClient yarnClient = getYarnClient(); + + // ------------------ Check if the specified queue exists -------------------- + + checkYarnQueues(yarnClient); + + // ------------------ Add dynamic properties to local flinkConfiguraton ------ + Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded); + for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) { + flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); + } + --- End diff -- semove second empty line
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---