YARN-4779. Fix AM container allocation logic in SLS. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b32ffa27 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b32ffa27 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b32ffa27 Branch: refs/heads/YARN-5734 Commit: b32ffa2753e83615b980721b6067fcc35ce54372 Parents: e8694de Author: Sunil G <sun...@apache.org> Authored: Fri Feb 24 21:39:25 2017 +0530 Committer: Sunil G <sun...@apache.org> Committed: Fri Feb 24 21:39:25 2017 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/yarn/sls/SLSRunner.java | 20 +- .../hadoop/yarn/sls/appmaster/AMSimulator.java | 89 +++++--- .../yarn/sls/appmaster/MRAMSimulator.java | 218 ++++++++----------- .../sls/resourcemanager/MockAMLauncher.java | 115 ++++++++++ .../sls/scheduler/SLSCapacityScheduler.java | 24 ++ 5 files changed, 305 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 61738fb..61b7f36 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.ObjectMapper; @@ -55,12 +56,14 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; +import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; @@ -119,10 +122,10 @@ public class SLSRunner { this.printSimulation = printsimulation; metricsOutputDir = outputDir; - nmMap = new HashMap<NodeId, NMSimulator>(); - queueAppNumMap = new HashMap<String, Integer>(); - amMap = new HashMap<String, AMSimulator>(); - amClassMap = new HashMap<String, Class>(); + nmMap = new HashMap<>(); + queueAppNumMap = new HashMap<>(); + amMap = new ConcurrentHashMap<>(); + amClassMap = new HashMap<>(); // runner configuration conf = new Configuration(false); @@ -179,7 +182,14 @@ public class SLSRunner { } rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); - rm = new ResourceManager(); + + final SLSRunner se = this; + rm = new ResourceManager() { + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new MockAMLauncher(se, this.rmContext, amMap); + } + }; rm.init(rmConf); rm.start(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index d61bf02..5b03d51 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; @@ -107,11 +109,19 @@ public abstract class AMSimulator extends TaskRunner.Task { // progress protected int totalContainers; protected int finishedContainers; + + // waiting for AM container + volatile boolean isAMContainerRunning = false; + volatile Container amContainer; protected final Logger LOG = Logger.getLogger(AMSimulator.class); - + + // resource for AM container + private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024; + private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1; + public AMSimulator() { - this.responseQueue = new LinkedBlockingQueue<AllocateResponse>(); + this.responseQueue = new LinkedBlockingQueue<>(); } public void init(int id, int heartbeatInterval, @@ -142,23 +152,30 @@ public abstract class AMSimulator extends TaskRunner.Task { // submit application, waiting until ACCEPTED submitApp(); - // register application master - registerAM(); - // track app metrics trackApp(); } + public synchronized void notifyAMContainerLaunched(Container masterContainer) + throws Exception { + this.amContainer = masterContainer; + this.appAttemptId = masterContainer.getId().getApplicationAttemptId(); + registerAM(); + isAMContainerRunning = true; + } + @Override public void middleStep() throws Exception { - // process responses in the queue - processResponseQueue(); - - // send out request - sendContainerRequest(); - - // check whether finish - checkStop(); + if (isAMContainerRunning) { + // process responses in the queue + processResponseQueue(); + + // send out request + sendContainerRequest(); + + // check whether finish + checkStop(); + } } @Override @@ -168,6 +185,22 @@ public abstract class AMSimulator extends TaskRunner.Task { if (isTracked) { untrackApp(); } + + // Finish AM container + if (amContainer != null) { + LOG.info("AM container = " + amContainer.getId() + " reported to finish"); + se.getNmMap().get(amContainer.getNodeId()).cleanupContainer( + amContainer.getId()); + } else { + LOG.info("AM container is null"); + } + + if (null == appAttemptId) { + // If appAttemptId == null, AM is not launched from RM's perspective, so + // it's unnecessary to finish am as well + return; + } + // unregister application master final FinishApplicationMasterRequest finishAMRequest = recordFactory .newRecordInstance(FinishApplicationMasterRequest.class); @@ -256,7 +289,9 @@ public abstract class AMSimulator extends TaskRunner.Task { conLauContext.setLocalResources(new HashMap<String, LocalResource>()); conLauContext.setServiceData(new HashMap<String, ByteBuffer>()); appSubContext.setAMContainerSpec(conLauContext); - appSubContext.setUnmanagedAM(true); + appSubContext.setResource(Resources + .createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, + MR_AM_CONTAINER_RESOURCE_VCORES)); subAppRequest.setApplicationSubmissionContext(appSubContext); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); ugi.doAs(new PrivilegedExceptionAction<Object>() { @@ -267,22 +302,6 @@ public abstract class AMSimulator extends TaskRunner.Task { } }); LOG.info(MessageFormat.format("Submit a new application {0}", appId)); - - // waiting until application ACCEPTED - RMApp app = rm.getRMContext().getRMApps().get(appId); - while(app.getState() != RMAppState.ACCEPTED) { - Thread.sleep(10); - } - - // Waiting until application attempt reach LAUNCHED - // "Unmanaged AM must register after AM attempt reaches LAUNCHED state" - this.appAttemptId = rm.getRMContext().getRMApps().get(appId) - .getCurrentAppAttempt().getAppAttemptId(); - RMAppAttempt rmAppAttempt = rm.getRMContext().getRMApps().get(appId) - .getCurrentAppAttempt(); - while (rmAppAttempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED) { - Thread.sleep(10); - } } private void registerAM() @@ -335,7 +354,7 @@ public abstract class AMSimulator extends TaskRunner.Task { for (ContainerSimulator cs : csList) { String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname()); // check rack local - String rackname = rackHostNames[0]; + String rackname = "/" + rackHostNames[0]; if (rackLocalRequestMap.containsKey(rackname)) { rackLocalRequestMap.get(rackname).setNumContainers( rackLocalRequestMap.get(rackname).getNumContainers() + 1); @@ -383,4 +402,12 @@ public abstract class AMSimulator extends TaskRunner.Task { public int getNumTasks() { return totalContainers; } + + public ApplicationId getApplicationId() { + return appId; + } + + public ApplicationAttemptId getApplicationAttemptId() { + return appAttemptId; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index da267a1..e726b09 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -27,6 +27,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.avro.Protocol; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; @@ -63,10 +64,10 @@ public class MRAMSimulator extends AMSimulator { private static final int PRIORITY_REDUCE = 10; private static final int PRIORITY_MAP = 20; - + // pending maps private LinkedList<ContainerSimulator> pendingMaps = - new LinkedList<ContainerSimulator>(); + new LinkedList<>(); // pending failed maps private LinkedList<ContainerSimulator> pendingFailedMaps = @@ -107,14 +108,9 @@ public class MRAMSimulator extends AMSimulator { private int mapTotal = 0; private int reduceFinished = 0; private int reduceTotal = 0; - // waiting for AM container - private boolean isAMContainerRunning = false; - private Container amContainer; + // finished private boolean isFinished = false; - // resource for AM container - private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024; - private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1; public final Logger LOG = Logger.getLogger(MRAMSimulator.class); @@ -131,83 +127,34 @@ public class MRAMSimulator extends AMSimulator { for (ContainerSimulator cs : containerList) { if (cs.getType().equals("map")) { cs.setPriority(PRIORITY_MAP); - pendingMaps.add(cs); + allMaps.add(cs); } else if (cs.getType().equals("reduce")) { cs.setPriority(PRIORITY_REDUCE); - pendingReduces.add(cs); + allReduces.add(cs); } } - allMaps.addAll(pendingMaps); - allReduces.addAll(pendingReduces); - mapTotal = pendingMaps.size(); - reduceTotal = pendingReduces.size(); + + LOG.info(MessageFormat + .format("Added new job with {0} mapper and {1} reducers", + allMaps.size(), allReduces.size())); + + mapTotal = allMaps.size(); + reduceTotal = allReduces.size(); totalContainers = mapTotal + reduceTotal; } @Override - public void firstStep() throws Exception { - super.firstStep(); - - requestAMContainer(); - } - - /** - * send out request for AM container - */ - protected void requestAMContainer() - throws YarnException, IOException, InterruptedException { - List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); - ResourceRequest amRequest = createResourceRequest( - BuilderUtils.newResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, - MR_AM_CONTAINER_RESOURCE_VCORES), - ResourceRequest.ANY, 1, 1); - ask.add(amRequest); - LOG.debug(MessageFormat.format("Application {0} sends out allocate " + - "request for its AM", appId)); - final AllocateRequest request = this.createAllocateRequest(ask); - - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps() - .get(appAttemptId.getApplicationId()) - .getRMAppAttempt(appAttemptId).getAMRMToken(); - ugi.addTokenIdentifier(token.decodeIdentifier()); - AllocateResponse response = ugi.doAs( - new PrivilegedExceptionAction<AllocateResponse>() { - @Override - public AllocateResponse run() throws Exception { - return rm.getApplicationMasterService().allocate(request); - } - }); - if (response != null) { - responseQueue.put(response); + public synchronized void notifyAMContainerLaunched(Container masterContainer) + throws Exception { + if (null != masterContainer) { + restart(); + super.notifyAMContainerLaunched(masterContainer); } } @Override @SuppressWarnings("unchecked") - protected void processResponseQueue() - throws InterruptedException, YarnException, IOException { - // Check whether receive the am container - if (!isAMContainerRunning) { - if (!responseQueue.isEmpty()) { - AllocateResponse response = responseQueue.take(); - if (response != null - && !response.getAllocatedContainers().isEmpty()) { - // Get AM container - Container container = response.getAllocatedContainers().get(0); - se.getNmMap().get(container.getNodeId()) - .addNewContainer(container, -1L); - // Start AM container - amContainer = container; - LOG.debug(MessageFormat.format("Application {0} starts its " + - "AM container ({1}).", appId, amContainer.getId())); - isAMContainerRunning = true; - } - } - return; - } - + protected void processResponseQueue() throws Exception { while (! responseQueue.isEmpty()) { AllocateResponse response = responseQueue.take(); @@ -228,12 +175,16 @@ public class MRAMSimulator extends AMSimulator { assignedReduces.remove(containerId); reduceFinished ++; finishedContainers ++; - } else { + } else if (amContainer.getId().equals(containerId)){ // am container released event isFinished = true; LOG.info(MessageFormat.format("Application {0} goes to " + "finish.", appId)); } + + if (mapFinished >= mapTotal && reduceFinished >= reduceTotal) { + lastStep(); + } } else { // container to be killed if (assignedMaps.containsKey(containerId)) { @@ -244,10 +195,9 @@ public class MRAMSimulator extends AMSimulator { LOG.debug(MessageFormat.format("Application {0} has one " + "reducer killed ({1}).", appId, containerId)); pendingFailedReduces.add(assignedReduces.remove(containerId)); - } else { + } else if (amContainer.getId().equals(containerId)){ LOG.info(MessageFormat.format("Application {0}'s AM is " + - "going to be killed. Restarting...", appId)); - restart(); + "going to be killed. Waiting for rescheduling...", appId)); } } } @@ -255,11 +205,8 @@ public class MRAMSimulator extends AMSimulator { // check finished if (isAMContainerRunning && - (mapFinished == mapTotal) && - (reduceFinished == reduceTotal)) { - // to release the AM container - se.getNmMap().get(amContainer.getNodeId()) - .cleanupContainer(amContainer.getId()); + (mapFinished >= mapTotal) && + (reduceFinished >= reduceTotal)) { isAMContainerRunning = false; LOG.debug(MessageFormat.format("Application {0} sends out event " + "to clean up its AM container.", appId)); @@ -293,21 +240,38 @@ public class MRAMSimulator extends AMSimulator { */ private void restart() throws YarnException, IOException, InterruptedException { - // clear - finishedContainers = 0; + // clear isFinished = false; - mapFinished = 0; - reduceFinished = 0; pendingFailedMaps.clear(); pendingMaps.clear(); pendingReduces.clear(); pendingFailedReduces.clear(); - pendingMaps.addAll(allMaps); - pendingReduces.addAll(pendingReduces); - isAMContainerRunning = false; + + // Only add totalMaps - finishedMaps + int added = 0; + for (ContainerSimulator cs : allMaps) { + if (added >= mapTotal - mapFinished) { + break; + } + pendingMaps.add(cs); + } + + // And same, only add totalReduces - finishedReduces + added = 0; + for (ContainerSimulator cs : allReduces) { + if (added >= reduceTotal - reduceFinished) { + break; + } + pendingReduces.add(cs); + } amContainer = null; - // resent am container request - requestAMContainer(); + } + + private List<ContainerSimulator> mergeLists(List<ContainerSimulator> left, List<ContainerSimulator> right) { + List<ContainerSimulator> list = new ArrayList<>(); + list.addAll(left); + list.addAll(right); + return list; } @Override @@ -319,44 +283,48 @@ public class MRAMSimulator extends AMSimulator { // send out request List<ResourceRequest> ask = null; - if (isAMContainerRunning) { - if (mapFinished != mapTotal) { - // map phase - if (! pendingMaps.isEmpty()) { - ask = packageRequests(pendingMaps, PRIORITY_MAP); - LOG.debug(MessageFormat.format("Application {0} sends out " + - "request for {1} mappers.", appId, pendingMaps.size())); - scheduledMaps.addAll(pendingMaps); - pendingMaps.clear(); - } else if (! pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) { - ask = packageRequests(pendingFailedMaps, PRIORITY_MAP); - LOG.debug(MessageFormat.format("Application {0} sends out " + - "requests for {1} failed mappers.", appId, - pendingFailedMaps.size())); - scheduledMaps.addAll(pendingFailedMaps); - pendingFailedMaps.clear(); - } - } else if (reduceFinished != reduceTotal) { - // reduce phase - if (! pendingReduces.isEmpty()) { - ask = packageRequests(pendingReduces, PRIORITY_REDUCE); - LOG.debug(MessageFormat.format("Application {0} sends out " + - "requests for {1} reducers.", appId, pendingReduces.size())); - scheduledReduces.addAll(pendingReduces); - pendingReduces.clear(); - } else if (! pendingFailedReduces.isEmpty() - && scheduledReduces.isEmpty()) { - ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE); - LOG.debug(MessageFormat.format("Application {0} sends out " + - "request for {1} failed reducers.", appId, - pendingFailedReduces.size())); - scheduledReduces.addAll(pendingFailedReduces); - pendingFailedReduces.clear(); - } + if (mapFinished != mapTotal) { + // map phase + if (!pendingMaps.isEmpty()) { + ask = packageRequests(mergeLists(pendingMaps, scheduledMaps), + PRIORITY_MAP); + LOG.debug(MessageFormat + .format("Application {0} sends out " + "request for {1} mappers.", + appId, pendingMaps.size())); + scheduledMaps.addAll(pendingMaps); + pendingMaps.clear(); + } else if (!pendingFailedMaps.isEmpty()) { + ask = packageRequests(mergeLists(pendingFailedMaps, scheduledMaps), + PRIORITY_MAP); + LOG.debug(MessageFormat.format( + "Application {0} sends out " + "requests for {1} failed mappers.", + appId, pendingFailedMaps.size())); + scheduledMaps.addAll(pendingFailedMaps); + pendingFailedMaps.clear(); + } + } else if (reduceFinished != reduceTotal) { + // reduce phase + if (!pendingReduces.isEmpty()) { + ask = packageRequests(mergeLists(pendingReduces, scheduledReduces), + PRIORITY_REDUCE); + LOG.debug(MessageFormat + .format("Application {0} sends out " + "requests for {1} reducers.", + appId, pendingReduces.size())); + scheduledReduces.addAll(pendingReduces); + pendingReduces.clear(); + } else if (!pendingFailedReduces.isEmpty()) { + ask = packageRequests(mergeLists(pendingFailedReduces, scheduledReduces), + PRIORITY_REDUCE); + LOG.debug(MessageFormat.format( + "Application {0} sends out " + "request for {1} failed reducers.", + appId, pendingFailedReduces.size())); + scheduledReduces.addAll(pendingFailedReduces); + pendingFailedReduces.clear(); } } + if (ask == null) { - ask = new ArrayList<ResourceRequest>(); + ask = new ArrayList<>(); } final AllocateRequest request = createAllocateRequest(ask); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java new file mode 100644 index 0000000..20cf3e5 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; + +import java.util.Map; + +public class MockAMLauncher extends ApplicationMasterLauncher + implements EventHandler<AMLauncherEvent> { + private static final Log LOG = LogFactory.getLog( + MockAMLauncher.class); + + Map<String, AMSimulator> amMap; + SLSRunner se; + + public MockAMLauncher(SLSRunner se, RMContext rmContext, + Map<String, AMSimulator> amMap) { + super(rmContext); + this.amMap = amMap; + this.se = se; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + // Do nothing + } + + @Override + protected void serviceStart() throws Exception { + // Do nothing + } + + private void setupAMRMToken(RMAppAttempt appAttempt) { + // Setup AMRMToken + Token<AMRMTokenIdentifier> amrmToken = + super.context.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttempt.getAppAttemptId()); + ((RMAppAttemptImpl) appAttempt).setAMRMToken(amrmToken); + } + + @Override + @SuppressWarnings("unchecked") + public void handle(AMLauncherEvent event) { + if (AMLauncherEventType.LAUNCH == event.getType()) { + ApplicationId appId = + event.getAppAttempt().getAppAttemptId().getApplicationId(); + + // find AMSimulator + for (AMSimulator ams : amMap.values()) { + if (ams.getApplicationId() != null && ams.getApplicationId().equals( + appId)) { + try { + Container amContainer = event.getAppAttempt().getMasterContainer(); + + setupAMRMToken(event.getAppAttempt()); + + // Notify RMAppAttempt to change state + super.context.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(), + RMAppAttemptEventType.LAUNCHED)); + + ams.notifyAMContainerLaunched( + event.getAppAttempt().getMasterContainer()); + LOG.info("Notify AM launcher launched:" + amContainer.getId()); + + se.getNmMap().get(amContainer.getNodeId()) + .addNewContainer(amContainer, 100000000L); + + return; + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + } + + throw new YarnRuntimeException( + "Didn't find any AMSimulator for applicationId=" + appId); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 8388273..cd4377e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -556,6 +556,30 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } } ); + metrics.register("variable.cluster.reserved.memory", + new Gauge<Long>() { + @Override + public Long getValue() { + if(getRootQueueMetrics() == null) { + return 0L; + } else { + return getRootQueueMetrics().getReservedMB(); + } + } + } + ); + metrics.register("variable.cluster.reserved.vcores", + new Gauge<Integer>() { + @Override + public Integer getValue() { + if(getRootQueueMetrics() == null) { + return 0; + } else { + return getRootQueueMetrics().getReservedVirtualCores(); + } + } + } + ); } private void registerContainerAppNumMetrics() { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org