Repository: hadoop Updated Branches: refs/heads/trunk 38996fdcf -> 147df300b
YARN-5067 Support specifying resources for AM containers in SLS. (Yufei Gu via Haibo Chen) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/147df300 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/147df300 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/147df300 Branch: refs/heads/trunk Commit: 147df300bf00b5f4ed250426b6ccdd69085466da Parents: 38996fd Author: Haibo Chen <haiboc...@apache.org> Authored: Fri Jun 30 16:50:06 2017 -0700 Committer: Haibo Chen <haiboc...@apache.org> Committed: Fri Jun 30 17:03:44 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/yarn/sls/SLSRunner.java | 38 +++++++++++++++---- .../hadoop/yarn/sls/appmaster/AMSimulator.java | 39 +++++++------------- .../yarn/sls/appmaster/MRAMSimulator.java | 11 +++--- .../hadoop/yarn/sls/conf/SLSConfiguration.java | 15 ++++++++ .../yarn/sls/appmaster/TestAMSimulator.java | 4 +- 5 files changed, 68 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/147df300/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 02da056..a534f03 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -406,7 +406,7 @@ public class SLSRunner extends Configured implements Tool { } runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, - getTaskContainers(jsonJob), null); + getTaskContainers(jsonJob), null, getAMContainerResource(jsonJob)); } private List<ContainerSimulator> getTaskContainers(Map jsonJob) @@ -558,7 +558,8 @@ public class SLSRunner extends Configured implements Tool { // Only supports the default job type currently runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, null); + jobStartTimeMS, jobFinishTimeMS, containerList, null, + getAMContainerResource(null)); } private Resource getDefaultContainerResource() { @@ -676,7 +677,8 @@ public class SLSRunner extends Configured implements Tool { } runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, rr); + jobStartTimeMS, jobFinishTimeMS, containerList, rr, + getAMContainerResource(null)); } } finally { stjp.close(); @@ -684,6 +686,26 @@ public class SLSRunner extends Configured implements Tool { } + private Resource getAMContainerResource(Map jsonJob) { + Resource amContainerResource = + SLSConfiguration.getAMContainerResource(getConf()); + + if (jsonJob == null) { + return amContainerResource; + } + + if (jsonJob.containsKey("am.memory")) { + amContainerResource.setMemorySize( + Long.parseLong(jsonJob.get("am.memory").toString())); + } + + if (jsonJob.containsKey("am.vcores")) { + amContainerResource.setVirtualCores( + Integer.parseInt(jsonJob.get("am.vcores").toString())); + } + return amContainerResource; + } + private void increaseQueueAppNum(String queue) throws YarnException { SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); String queueName = wrapper.getRealQueueName(queue); @@ -700,7 +722,7 @@ public class SLSRunner extends Configured implements Tool { private void runNewAM(String jobType, String user, String jobQueue, String oldJobId, long jobStartTimeMS, long jobFinishTimeMS, List<ContainerSimulator> containerList, - ReservationSubmissionRequest rr) { + ReservationSubmissionRequest rr, Resource amContainerResource) { AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( amClassMap.get(jobType), new Configuration()); @@ -710,9 +732,11 @@ public class SLSRunner extends Configured implements Tool { SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); boolean isTracked = trackedApps.contains(oldJobId); - amSim.init(AM_ID++, heartbeatInterval, containerList, - rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, - isTracked, oldJobId, rr, runner.getStartTimeMS()); + AM_ID++; + + amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, + jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, rr, + runner.getStartTimeMS(), amContainerResource); runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); numTasks += containerList.size(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/147df300/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 7ce3ef0..c69805e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.sls.appmaster; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; -import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; @@ -35,18 +34,13 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords - .FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; - -import org.apache.hadoop.yarn.api.protocolrecords - .RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords - .RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -54,7 +48,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -66,7 +59,6 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.util.Records; -import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.apache.hadoop.yarn.sls.SLSRunner; @@ -116,9 +108,7 @@ public abstract class AMSimulator extends TaskRunner.Task { private static final Logger LOG = LoggerFactory.getLogger(AMSimulator.class); - // resource for AM container - private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024; - private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1; + private Resource amContainerResource; private ReservationSubmissionRequest reservationRequest; @@ -127,11 +117,12 @@ public abstract class AMSimulator extends TaskRunner.Task { } @SuppressWarnings("checkstyle:parameternumber") - public void init(int id, int heartbeatInterval, + public void init(int heartbeatInterval, List<ContainerSimulator> containerList, ResourceManager resourceManager, SLSRunner slsRunnner, long startTime, long finishTime, String simUser, String simQueue, boolean tracked, String oldApp, - ReservationSubmissionRequest rr, long baseTimeMS) { + ReservationSubmissionRequest rr, long baseTimeMS, + Resource amContainerResource) { super.init(startTime, startTime + 1000000L * heartbeatInterval, heartbeatInterval); this.user = simUser; @@ -144,6 +135,7 @@ public abstract class AMSimulator extends TaskRunner.Task { this.traceStartTimeMS = startTime; this.traceFinishTimeMS = finishTime; this.reservationRequest = rr; + this.amContainerResource = amContainerResource; } /** @@ -318,16 +310,13 @@ public abstract class AMSimulator extends TaskRunner.Task { appSubContext.setPriority(Priority.newInstance(0)); ContainerLaunchContext conLauContext = Records.newRecord(ContainerLaunchContext.class); - conLauContext.setApplicationACLs( - new HashMap<ApplicationAccessType, String>()); - conLauContext.setCommands(new ArrayList<String>()); - conLauContext.setEnvironment(new HashMap<String, String>()); - conLauContext.setLocalResources(new HashMap<String, LocalResource>()); - conLauContext.setServiceData(new HashMap<String, ByteBuffer>()); + conLauContext.setApplicationACLs(new HashMap<>()); + conLauContext.setCommands(new ArrayList<>()); + conLauContext.setEnvironment(new HashMap<>()); + conLauContext.setLocalResources(new HashMap<>()); + conLauContext.setServiceData(new HashMap<>()); appSubContext.setAMContainerSpec(conLauContext); - appSubContext.setResource(Resources - .createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, - MR_AM_CONTAINER_RESOURCE_VCORES)); + appSubContext.setResource(amContainerResource); if(reservationId != null) { appSubContext.setReservationID(reservationId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/147df300/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 7ac30ab..21bf054 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -114,14 +115,14 @@ public class MRAMSimulator extends AMSimulator { LoggerFactory.getLogger(MRAMSimulator.class); @SuppressWarnings("checkstyle:parameternumber") - public void init(int id, int heartbeatInterval, + public void init(int heartbeatInterval, List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, boolean isTracked, String oldAppId, ReservationSubmissionRequest rr, - long baselineStartTimeMS) { - super.init(id, heartbeatInterval, containerList, rm, se, - traceStartTime, traceFinishTime, user, queue, - isTracked, oldAppId, rr, baselineStartTimeMS); + long baselineStartTimeMS, Resource amContainerResource) { + super.init(heartbeatInterval, containerList, rm, se, + traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, + rr, baselineStartTimeMS, amContainerResource); amtype = "mapreduce"; // get map/reduce tasks http://git-wip-us.apache.org/repos/asf/hadoop/blob/147df300/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index 8fd5b3f..038f202 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.sls.conf; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; @Private @Unstable @@ -62,6 +64,14 @@ public class SLSConfiguration { public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000; public static final String AM_TYPE = AM_PREFIX + "type."; + public static final String AM_CONTAINER_MEMORY = AM_PREFIX + + "container.memory"; + public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024; + + public static final String AM_CONTAINER_VCORES = AM_PREFIX + + "container.vcores"; + public static final int AM_CONTAINER_VCORES_DEFAULT = 1; + // container public static final String CONTAINER_PREFIX = PREFIX + "container."; public static final String CONTAINER_MEMORY_MB = CONTAINER_PREFIX @@ -70,4 +80,9 @@ public class SLSConfiguration { public static final String CONTAINER_VCORES = CONTAINER_PREFIX + "vcores"; public static final int CONTAINER_VCORES_DEFAULT = 1; + public static Resource getAMContainerResource(Configuration conf) { + return Resource.newInstance( + conf.getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT), + conf.getInt(AM_CONTAINER_VCORES, AM_CONTAINER_VCORES_DEFAULT)); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/147df300/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index 56aa219..02dc26e 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -133,8 +133,8 @@ public class TestAMSimulator { String appId = "app1"; String queue = "default"; List<ContainerSimulator> containers = new ArrayList<>(); - app.init(1, 1000, containers, rm, null, 0, 1000000L, "user1", queue, - true, appId, null, 0); + app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, + appId, null, 0, SLSConfiguration.getAMContainerResource(conf)); app.firstStep(); verifySchedulerMetrics(appId); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org