This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0c4db2dd577 [FLINK-31510][yarn] Replace deprecated getMemory by getMemorySize 0c4db2dd577 is described below commit 0c4db2dd577fdfca1a12c115948bf67931d0bcde Author: slfan1989 <louj1988@@> AuthorDate: Sun Apr 9 08:54:13 2023 +0800 [FLINK-31510][yarn] Replace deprecated getMemory by getMemorySize This closes #22207 --- ...rocessSpecContainerResourcePriorityAdapter.java | 2 +- .../apache/flink/yarn/YarnClusterDescriptor.java | 34 +++++++++++----------- ...ssSpecContainerResourcePriorityAdapterTest.java | 4 +-- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapter.java b/flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapter.java index 1fb1bc2b421..bfe4da71e45 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapter.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapter.java @@ -123,7 +123,7 @@ public class TaskExecutorProcessSpecContainerResourcePriorityAdapter { taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), taskExecutorProcessSpec.getCpuCores().getValue().intValue()); - if (resource.getMemory() > maxContainerResource.getMemory() + if (resource.getMemorySize() > maxContainerResource.getMemorySize() || resource.getVirtualCores() > maxContainerResource.getVirtualCores()) { LOG.warn( "Requested container resource ({}) exceeds the max limitation of the Yarn cluster ({}). Will not allocate resource.", diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 965c608e38d..c2d830f9c3b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -672,22 +672,22 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n"; - if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) { + if (jobManagerMemoryMb > maximumResourceCapability.getMemorySize()) { throw new YarnDeploymentException( "The cluster does not have the requested resources for the JobManager available!\n" + "Maximum Memory: " - + maximumResourceCapability.getMemory() + + maximumResourceCapability.getMemorySize() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note); } - if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) { + if (taskManagerMemoryMb > maximumResourceCapability.getMemorySize()) { throw new YarnDeploymentException( "The cluster does not have the requested resources for the TaskManagers available!\n" + "Maximum Memory: " - + maximumResourceCapability.getMemory() + + maximumResourceCapability.getMemorySize() + " Requested: " + taskManagerMemoryMb + "MB. " @@ -1196,7 +1196,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { // Set up resource type requirements for ApplicationMaster Resource capability = Records.newRecord(Resource.class); - capability.setMemory(clusterSpecification.getMasterMemoryMB()); + capability.setMemorySize(clusterSpecification.getMasterMemoryMB()); capability.setVirtualCores( flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES)); @@ -1390,12 +1390,12 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { } private static class ClusterResourceDescription { - public final int totalFreeMemory; - public final int containerLimit; - public final int[] nodeManagersFree; + public final long totalFreeMemory; + public final long containerLimit; + public final long[] nodeManagersFree; public ClusterResourceDescription( - int totalFreeMemory, int containerLimit, int[] nodeManagersFree) { + long totalFreeMemory, long containerLimit, long[] nodeManagersFree) { this.totalFreeMemory = totalFreeMemory; this.containerLimit = containerLimit; this.nodeManagersFree = nodeManagersFree; @@ -1407,14 +1407,14 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING); int totalFreeMemory = 0; - int containerLimit = 0; - int[] nodeManagersFree = new int[nodes.size()]; + long containerLimit = 0; + long[] nodeManagersFree = new long[nodes.size()]; for (int i = 0; i < nodes.size(); i++) { NodeReport rep = nodes.get(i); - int free = - rep.getCapability().getMemory() - - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0); + long free = + rep.getCapability().getMemorySize() + - (rep.getUsed() != null ? rep.getUsed().getMemorySize() : 0); nodeManagersFree[i] = free; totalFreeMemory += free; if (free > containerLimit) { @@ -1438,14 +1438,14 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { final String format = "|%-16s |%-16s %n"; ps.printf("|Property |Value %n"); ps.println("+---------------------------------------+"); - int totalMemory = 0; + long totalMemory = 0; int totalCores = 0; for (NodeReport rep : nodes) { final Resource res = rep.getCapability(); - totalMemory += res.getMemory(); + totalMemory += res.getMemorySize(); totalCores += res.getVirtualCores(); ps.format(format, "NodeID", rep.getNodeId()); - ps.format(format, "Memory", getDisplayMemory(res.getMemory())); + ps.format(format, "Memory", getDisplayMemory(res.getMemorySize())); ps.format(format, "vCores", res.getVirtualCores()); ps.format(format, "HealthReport", rep.getHealthReport()); ps.format(format, "Containers", rep.getNumContainers()); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapterTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapterTest.java index afcaf2ddf7e..1832f160466 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapterTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapterTest.java @@ -140,7 +140,7 @@ class TaskExecutorProcessSpecContainerResourcePriorityAdapterTest { void testGetResourceFromSpec() { final TaskExecutorProcessSpecContainerResourcePriorityAdapter adapter = getAdapter(); final Resource resource = getResource(adapter, TASK_EXECUTOR_PROCESS_SPEC_1); - assertThat(resource.getMemory()) + assertThat(resource.getMemorySize()) .isEqualTo(TASK_EXECUTOR_PROCESS_SPEC_1.getTotalProcessMemorySize().getMebiBytes()); assertThat(resource.getVirtualCores()) .isEqualTo(TASK_EXECUTOR_PROCESS_SPEC_1.getCpuCores().getValue().intValue()); @@ -253,7 +253,7 @@ class TaskExecutorProcessSpecContainerResourcePriorityAdapterTest { getAdapterWithExternalResources(String resourceName, String configKey) { final Resource maxResource = Resource.newInstance( - MAX_CONTAINER_RESOURCE.getMemory(), + MAX_CONTAINER_RESOURCE.getMemorySize(), MAX_CONTAINER_RESOURCE.getVirtualCores()); ResourceInformationReflector.INSTANCE.setResourceInformation( maxResource,