http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index f40f47d..f999dce 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -31,60 +31,77 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.tools.rumen.JobTraceReader; import org.apache.hadoop.tools.rumen.LoggedJob; import org.apache.hadoop.tools.rumen.LoggedTask; import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; +import org.apache.hadoop.tools.rumen.TaskAttemptInfo; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; -import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; -import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; +import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.Resources; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Private @Unstable -public class SLSRunner { +public class SLSRunner extends Configured implements Tool { // RM, Runner private ResourceManager rm; private static TaskRunner runner = new TaskRunner(); private String[] inputTraces; - private Configuration conf; private Map<String, Integer> queueAppNumMap; - + // NM simulator private HashMap<NodeId, NMSimulator> nmMap; private int nmMemoryMB, nmVCores; private String nodeFile; - + // AM simulator private int AM_ID; private Map<String, AMSimulator> amMap; @@ -99,49 +116,92 @@ public class SLSRunner { // other simulation information private int numNMs, numRacks, numAMs, numTasks; private long maxRuntime; - public final static Map<String, Object> simulateInfoMap = + + private final static Map<String, Object> simulateInfoMap = new HashMap<String, Object>(); // logger public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class); - // input traces, input-rumen or input-sls - private boolean isSLS; - - public SLSRunner(boolean isSLS, String inputTraces[], String nodeFile, - String outputDir, Set<String> trackedApps, - boolean printsimulation) - throws IOException, ClassNotFoundException { - this.isSLS = isSLS; - this.inputTraces = inputTraces.clone(); - this.nodeFile = nodeFile; - this.trackedApps = trackedApps; - this.printSimulation = printsimulation; - metricsOutputDir = outputDir; - - nmMap = new HashMap<NodeId, NMSimulator>(); - queueAppNumMap = new HashMap<String, Integer>(); - amMap = new HashMap<String, AMSimulator>(); - amClassMap = new HashMap<String, Class>(); - + private final static int DEFAULT_MAPPER_PRIORITY = 20; + private final static int DEFAULT_REDUCER_PRIORITY = 10; + + private static boolean exitAtTheFinish = false; + + /** + * The type of trace in input. + */ + public enum TraceType { + SLS, RUMEN, SYNTH + } + + private TraceType inputType; + private SynthTraceJobProducer stjp; + + public SLSRunner() throws ClassNotFoundException { + Configuration tempConf = new Configuration(false); + init(tempConf); + } + + public SLSRunner(Configuration tempConf) throws ClassNotFoundException { + init(tempConf); + } + + @Override + public void setConf(Configuration conf) { + if (null != conf) { + // Override setConf to make sure all conf added load sls-runner.xml, see + // YARN-6560 + conf.addResource("sls-runner.xml"); + } + super.setConf(conf); + } + + private void init(Configuration tempConf) throws ClassNotFoundException { + nmMap = new HashMap<>(); + queueAppNumMap = new HashMap<>(); + amMap = new ConcurrentHashMap<>(); + amClassMap = new HashMap<>(); + // runner configuration - conf = new Configuration(false); - conf.addResource("sls-runner.xml"); + setConf(tempConf); + // runner - int poolSize = conf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, - SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSRunner.runner.setQueueSize(poolSize); // <AMType, Class> map - for (Map.Entry e : conf) { + for (Map.Entry e : tempConf) { String key = e.getKey().toString(); if (key.startsWith(SLSConfiguration.AM_TYPE)) { String amType = key.substring(SLSConfiguration.AM_TYPE.length()); - amClassMap.put(amType, Class.forName(conf.get(key))); + amClassMap.put(amType, Class.forName(tempConf.get(key))); } } } - - public void start() throws Exception { + + /** + * @return an unmodifiable view of the simulated info map. + */ + public static Map<String, Object> getSimulateInfoMap() { + return Collections.unmodifiableMap(simulateInfoMap); + } + + public void setSimulationParams(TraceType inType, String[] inTraces, + String nodes, String outDir, Set<String> trackApps, + boolean printsimulation) throws IOException, ClassNotFoundException { + + this.inputType = inType; + this.inputTraces = inTraces.clone(); + this.nodeFile = nodes; + this.trackedApps = trackApps; + this.printSimulation = printsimulation; + metricsOutputDir = outDir; + + } + + public void start() throws IOException, ClassNotFoundException, YarnException, + InterruptedException { // start resource manager startRM(); // start node managers @@ -149,10 +209,10 @@ public class SLSRunner { // start application masters startAM(); // set queue & tracked apps information - ((SchedulerWrapper) rm.getResourceScheduler()) - .setQueueSet(this.queueAppNumMap.keySet()); - ((SchedulerWrapper) rm.getResourceScheduler()) - .setTrackedAppSet(this.trackedApps); + ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() + .setQueueSet(this.queueAppNumMap.keySet()); + ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() + .setTrackedAppSet(this.trackedApps); // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING @@ -160,66 +220,92 @@ public class SLSRunner { // starting the runner once everything is ready to go, runner.start(); } - - private void startRM() throws IOException, ClassNotFoundException { - Configuration rmConf = new YarnConfiguration(); + + private void startRM() throws ClassNotFoundException, YarnException { + Configuration rmConf = new YarnConfiguration(getConf()); String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); - // For CapacityScheduler we use a sub-classing instead of wrapping - // to allow scheduler-specific invocations from monitors to work - // this can be used for other schedulers as well if we care to - // exercise/track behaviors that are not common to the scheduler api - if(Class.forName(schedulerClass) == CapacityScheduler.class) { + if (Class.forName(schedulerClass) == CapacityScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, SLSCapacityScheduler.class.getName()); rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, ProportionalCapacityPreemptionPolicy.class.getName()); - } else { + } else if (Class.forName(schedulerClass) == FairScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, - ResourceSchedulerWrapper.class.getName()); - rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass); + SLSFairScheduler.class.getName()); + } else if (Class.forName(schedulerClass) == FifoScheduler.class) { + // TODO add support for FifoScheduler + throw new YarnException("Fifo Scheduler is not supported yet."); } rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); - rm = new ResourceManager(); + + final SLSRunner se = this; + rm = new ResourceManager() { + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new MockAMLauncher(se, this.rmContext, amMap); + } + }; + + // Across runs of parametrized tests, the JvmMetrics objects is retained, + // but is not registered correctly + JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null); + jvmMetrics.registerIfNeeded(); + + // Init and start the actual ResourceManager rm.init(rmConf); rm.start(); } private void startNM() throws YarnException, IOException { // nm configuration - nmMemoryMB = conf.getInt(SLSConfiguration.NM_MEMORY_MB, - SLSConfiguration.NM_MEMORY_MB_DEFAULT); - nmVCores = conf.getInt(SLSConfiguration.NM_VCORES, - SLSConfiguration.NM_VCORES_DEFAULT); - int heartbeatInterval = conf.getInt( - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, + nmMemoryMB = getConf().getInt(SLSConfiguration.NM_MEMORY_MB, + SLSConfiguration.NM_MEMORY_MB_DEFAULT); + nmVCores = getConf().getInt(SLSConfiguration.NM_VCORES, + SLSConfiguration.NM_VCORES_DEFAULT); + int heartbeatInterval = + getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); // nm information (fetch from topology file, or from sls/rumen json file) Set<String> nodeSet = new HashSet<String>(); if (nodeFile.isEmpty()) { - if (isSLS) { - for (String inputTrace : inputTraces) { + for (String inputTrace : inputTraces) { + + switch (inputType) { + case SLS: nodeSet.addAll(SLSUtils.parseNodesFromSLSTrace(inputTrace)); - } - } else { - for (String inputTrace : inputTraces) { + break; + case RUMEN: nodeSet.addAll(SLSUtils.parseNodesFromRumenTrace(inputTrace)); + break; + case SYNTH: + stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + nodeSet.addAll(SLSUtils.generateNodes(stjp.getNumNodes(), + stjp.getNumNodes()/stjp.getNodesPerRack())); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); } } - } else { nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile)); } + + if (nodeSet.size() == 0) { + throw new YarnException("No node! Please configure nodes."); + } + // create NM simulators Random random = new Random(); Set<String> rackSet = new HashSet<String>(); for (String hostName : nodeSet) { // we randomize the heartbeat start time from zero to 1 interval NMSimulator nm = new NMSimulator(); - nm.init(hostName, nmMemoryMB, nmVCores, - random.nextInt(heartbeatInterval), heartbeatInterval, rm); + nm.init(hostName, nmMemoryMB, nmVCores, random.nextInt(heartbeatInterval), + heartbeatInterval, rm); nmMap.put(nm.getNode().getNodeID(), nm); runner.schedule(nm); rackSet.add(nm.getNode().getRackName()); @@ -234,7 +320,7 @@ public class SLSRunner { int numRunningNodes = 0; for (RMNode node : rm.getRMContext().getRMNodes().values()) { if (node.getState() == NodeState.RUNNING) { - numRunningNodes ++; + numRunningNodes++; } } if (numRunningNodes == numNMs) { @@ -250,209 +336,433 @@ public class SLSRunner { @SuppressWarnings("unchecked") private void startAM() throws YarnException, IOException { - // application/container configuration - int heartbeatInterval = conf.getInt( - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); - int containerMemoryMB = conf.getInt(SLSConfiguration.CONTAINER_MEMORY_MB, - SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); - int containerVCores = conf.getInt(SLSConfiguration.CONTAINER_VCORES, - SLSConfiguration.CONTAINER_VCORES_DEFAULT); - Resource containerResource = - BuilderUtils.newResource(containerMemoryMB, containerVCores); - - // application workload - if (isSLS) { - startAMFromSLSTraces(containerResource, heartbeatInterval); - } else { - startAMFromRumenTraces(containerResource, heartbeatInterval); + switch (inputType) { + case SLS: + for (String inputTrace : inputTraces) { + startAMFromSLSTrace(inputTrace); + } + break; + case RUMEN: + long baselineTimeMS = 0; + for (String inputTrace : inputTraces) { + startAMFromRumenTrace(inputTrace, baselineTimeMS); + } + break; + case SYNTH: + startAMFromSynthGenerator(); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); } + numAMs = amMap.size(); remainingApps = numAMs; } /** - * parse workload information from sls trace files + * Parse workload from a SLS trace file. */ @SuppressWarnings("unchecked") - private void startAMFromSLSTraces(Resource containerResource, - int heartbeatInterval) throws IOException { - // parse from sls traces + private void startAMFromSLSTrace(String inputTrace) throws IOException { JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); - for (String inputTrace : inputTraces) { - Reader input = - new InputStreamReader(new FileInputStream(inputTrace), "UTF-8"); - try { - Iterator<Map> i = mapper.readValues(jsonF.createJsonParser(input), - Map.class); - while (i.hasNext()) { - Map jsonJob = i.next(); - - // load job information - long jobStartTime = Long.parseLong( - jsonJob.get("job.start.ms").toString()); - long jobFinishTime = Long.parseLong( - jsonJob.get("job.end.ms").toString()); - - String user = (String) jsonJob.get("job.user"); - if (user == null) user = "default"; - String queue = jsonJob.get("job.queue.name").toString(); - - String oldAppId = jsonJob.get("job.id").toString(); - boolean isTracked = trackedApps.contains(oldAppId); - int queueSize = queueAppNumMap.containsKey(queue) ? - queueAppNumMap.get(queue) : 0; - queueSize ++; - queueAppNumMap.put(queue, queueSize); - // tasks - List tasks = (List) jsonJob.get("job.tasks"); - if (tasks == null || tasks.size() == 0) { - continue; - } - List<ContainerSimulator> containerList = - new ArrayList<ContainerSimulator>(); - for (Object o : tasks) { - Map jsonTask = (Map) o; - String hostname = jsonTask.get("container.host").toString(); - long taskStart = Long.parseLong( - jsonTask.get("container.start.ms").toString()); - long taskFinish = Long.parseLong( - jsonTask.get("container.end.ms").toString()); - long lifeTime = taskFinish - taskStart; - - // Set memory and vcores from job trace file - Resource res = Resources.clone(containerResource); - if (jsonTask.containsKey("container.memory")) { - int containerMemory = Integer.parseInt( - jsonTask.get("container.memory").toString()); - res.setMemorySize(containerMemory); - } - - if (jsonTask.containsKey("container.vcores")) { - int containerVCores = Integer.parseInt( - jsonTask.get("container.vcores").toString()); - res.setVirtualCores(containerVCores); - } - - int priority = Integer.parseInt( - jsonTask.get("container.priority").toString()); - String type = jsonTask.get("container.type").toString(); - containerList.add(new ContainerSimulator(res, - lifeTime, hostname, priority, type)); - } - - // create a new AM - String amType = jsonJob.get("am.type").toString(); - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(amType), new Configuration()); - if (amSim != null) { - amSim.init(AM_ID++, heartbeatInterval, containerList, rm, - this, jobStartTime, jobFinishTime, user, queue, - isTracked, oldAppId); - runner.schedule(amSim); - maxRuntime = Math.max(maxRuntime, jobFinishTime); - numTasks += containerList.size(); - amMap.put(oldAppId, amSim); - } + + try (Reader input = new InputStreamReader( + new FileInputStream(inputTrace), "UTF-8")) { + Iterator<Map> jobIter = mapper.readValues( + jsonF.createParser(input), Map.class); + + while (jobIter.hasNext()) { + try { + createAMForJob(jobIter.next()); + } catch (Exception e) { + LOG.error("Failed to create an AM: {}", e.getMessage()); } - } finally { - input.close(); } } } + private void createAMForJob(Map jsonJob) throws YarnException { + long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString()); + + long jobFinishTime = 0; + if (jsonJob.containsKey("job.end.ms")) { + jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString()); + } + + String user = (String) jsonJob.get("job.user"); + if (user == null) { + user = "default"; + } + + String queue = jsonJob.get("job.queue.name").toString(); + increaseQueueAppNum(queue); + + String amType = (String)jsonJob.get("am.type"); + if (amType == null) { + amType = SLSUtils.DEFAULT_JOB_TYPE; + } + + int jobCount = 1; + if (jsonJob.containsKey("job.count")) { + jobCount = Integer.parseInt(jsonJob.get("job.count").toString()); + } + jobCount = Math.max(jobCount, 1); + + String oldAppId = (String)jsonJob.get("job.id"); + // Job id is generated automatically if this job configuration allows + // multiple job instances + if(jobCount > 1) { + oldAppId = null; + } + + for (int i = 0; i < jobCount; i++) { + runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, + getTaskContainers(jsonJob), null, getAMContainerResource(jsonJob)); + } + } + + private List<ContainerSimulator> getTaskContainers(Map jsonJob) + throws YarnException { + List<ContainerSimulator> containers = new ArrayList<>(); + List tasks = (List) jsonJob.get("job.tasks"); + if (tasks == null || tasks.size() == 0) { + throw new YarnException("No task for the job!"); + } + + for (Object o : tasks) { + Map jsonTask = (Map) o; + + String hostname = (String) jsonTask.get("container.host"); + + long duration = 0; + if (jsonTask.containsKey("duration.ms")) { + duration = Integer.parseInt(jsonTask.get("duration.ms").toString()); + } else if (jsonTask.containsKey("container.start.ms") && + jsonTask.containsKey("container.end.ms")) { + long taskStart = Long.parseLong(jsonTask.get("container.start.ms") + .toString()); + long taskFinish = Long.parseLong(jsonTask.get("container.end.ms") + .toString()); + duration = taskFinish - taskStart; + } + if (duration <= 0) { + throw new YarnException("Duration of a task shouldn't be less or equal" + + " to 0!"); + } + + Resource res = getDefaultContainerResource(); + if (jsonTask.containsKey("container.memory")) { + int containerMemory = + Integer.parseInt(jsonTask.get("container.memory").toString()); + res.setMemorySize(containerMemory); + } + + if (jsonTask.containsKey("container.vcores")) { + int containerVCores = + Integer.parseInt(jsonTask.get("container.vcores").toString()); + res.setVirtualCores(containerVCores); + } + + int priority = DEFAULT_MAPPER_PRIORITY; + if (jsonTask.containsKey("container.priority")) { + priority = Integer.parseInt(jsonTask.get("container.priority") + .toString()); + } + + String type = "map"; + if (jsonTask.containsKey("container.type")) { + type = jsonTask.get("container.type").toString(); + } + + int count = 1; + if (jsonTask.containsKey("count")) { + count = Integer.parseInt(jsonTask.get("count").toString()); + } + count = Math.max(count, 1); + + for (int i = 0; i < count; i++) { + containers.add( + new ContainerSimulator(res, duration, hostname, priority, type)); + } + } + + return containers; + } + /** - * parse workload information from rumen trace files + * Parse workload from a rumen trace file. */ @SuppressWarnings("unchecked") - private void startAMFromRumenTraces(Resource containerResource, - int heartbeatInterval) - throws IOException { + private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) + throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); + File fin = new File(inputTrace); + + try (JobTraceReader reader = new JobTraceReader( + new Path(fin.getAbsolutePath()), conf)) { + LoggedJob job = reader.getNext(); + + while (job != null) { + try { + createAMForJob(job, baselineTimeMS); + } catch (Exception e) { + LOG.error("Failed to create an AM: {}", e.getMessage()); + } + + job = reader.getNext(); + } + } + } + + private void createAMForJob(LoggedJob job, long baselineTimeMs) + throws YarnException { + String user = job.getUser() == null ? "default" : + job.getUser().getValue(); + String jobQueue = job.getQueue().getValue(); + String oldJobId = job.getJobID().toString(); + long jobStartTimeMS = job.getSubmitTime(); + long jobFinishTimeMS = job.getFinishTime(); + if (baselineTimeMs == 0) { + baselineTimeMs = job.getSubmitTime(); + } + jobStartTimeMS -= baselineTimeMs; + jobFinishTimeMS -= baselineTimeMs; + if (jobStartTimeMS < 0) { + LOG.warn("Warning: reset job {} start time to 0.", oldJobId); + jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; + jobStartTimeMS = 0; + } + + increaseQueueAppNum(jobQueue); + + List<ContainerSimulator> containerList = new ArrayList<>(); + // mapper + for (LoggedTask mapTask : job.getMapTasks()) { + if (mapTask.getAttempts().size() == 0) { + throw new YarnException("Invalid map task, no attempt for a mapper!"); + } + LoggedTaskAttempt taskAttempt = + mapTask.getAttempts().get(mapTask.getAttempts().size() - 1); + String hostname = taskAttempt.getHostName().getValue(); + long containerLifeTime = taskAttempt.getFinishTime() - + taskAttempt.getStartTime(); + containerList.add( + new ContainerSimulator(getDefaultContainerResource(), + containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); + } + + // reducer + for (LoggedTask reduceTask : job.getReduceTasks()) { + if (reduceTask.getAttempts().size() == 0) { + throw new YarnException( + "Invalid reduce task, no attempt for a reducer!"); + } + LoggedTaskAttempt taskAttempt = + reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1); + String hostname = taskAttempt.getHostName().getValue(); + long containerLifeTime = taskAttempt.getFinishTime() - + taskAttempt.getStartTime(); + containerList.add( + new ContainerSimulator(getDefaultContainerResource(), + containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); + } + + // Only supports the default job type currently + runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, + jobStartTimeMS, jobFinishTimeMS, containerList, null, + getAMContainerResource(null)); + } + + private Resource getDefaultContainerResource() { + int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, + SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); + int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES, + SLSConfiguration.CONTAINER_VCORES_DEFAULT); + return Resources.createResource(containerMemory, containerVCores); + } + + /** + * parse workload information from synth-generator trace files. + */ + @SuppressWarnings("unchecked") + private void startAMFromSynthGenerator() throws YarnException, IOException { + Configuration localConf = new Configuration(); + localConf.set("fs.defaultFS", "file:///"); long baselineTimeMS = 0; - for (String inputTrace : inputTraces) { - File fin = new File(inputTrace); - JobTraceReader reader = new JobTraceReader( - new Path(fin.getAbsolutePath()), conf); - try { - LoggedJob job = null; - while ((job = reader.getNext()) != null) { - // only support MapReduce currently - String jobType = "mapreduce"; - String user = job.getUser() == null ? - "default" : job.getUser().getValue(); - String jobQueue = job.getQueue().getValue(); - String oldJobId = job.getJobID().toString(); - long jobStartTimeMS = job.getSubmitTime(); - long jobFinishTimeMS = job.getFinishTime(); - if (baselineTimeMS == 0) { - baselineTimeMS = jobStartTimeMS; - } - jobStartTimeMS -= baselineTimeMS; - jobFinishTimeMS -= baselineTimeMS; - if (jobStartTimeMS < 0) { - LOG.warn("Warning: reset job {} start time to 0.", oldJobId); - jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; - jobStartTimeMS = 0; - } - - boolean isTracked = trackedApps.contains(oldJobId); - int queueSize = queueAppNumMap.containsKey(jobQueue) ? - queueAppNumMap.get(jobQueue) : 0; - queueSize ++; - queueAppNumMap.put(jobQueue, queueSize); - - List<ContainerSimulator> containerList = - new ArrayList<ContainerSimulator>(); - // map tasks - for(LoggedTask mapTask : job.getMapTasks()) { - if (mapTask.getAttempts().size() == 0) { - continue; - } - LoggedTaskAttempt taskAttempt = mapTask.getAttempts() - .get(mapTask.getAttempts().size() - 1); - String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); - containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 10, "map")); - } - - // reduce tasks - for(LoggedTask reduceTask : job.getReduceTasks()) { - if (reduceTask.getAttempts().size() == 0) { - continue; - } - LoggedTaskAttempt taskAttempt = reduceTask.getAttempts() - .get(reduceTask.getAttempts().size() - 1); - String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); - containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 20, "reduce")); - } - - // create a new AM - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(jobType), conf); - if (amSim != null) { - amSim.init(AM_ID ++, heartbeatInterval, containerList, - rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, - isTracked, oldJobId); - runner.schedule(amSim); - maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); - numTasks += containerList.size(); - amMap.put(oldJobId, amSim); - } + + // reservations use wall clock time, so need to have a reference for that + UTCClock clock = new UTCClock(); + long now = clock.getTime(); + + try { + + // if we use the nodeFile this could have been not initialized yet. + if (stjp == null) { + stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + } + + SynthJob job = null; + // we use stjp, a reference to the job producer instantiated during node + // creation + while ((job = (SynthJob) stjp.getNextJob()) != null) { + // only support MapReduce currently + String user = job.getUser(); + String jobQueue = job.getQueueName(); + String oldJobId = job.getJobID().toString(); + long jobStartTimeMS = job.getSubmissionTime(); + + // CARLO: Finish time is only used for logging, omit for now + long jobFinishTimeMS = -1L; + + if (baselineTimeMS == 0) { + baselineTimeMS = jobStartTimeMS; + } + jobStartTimeMS -= baselineTimeMS; + jobFinishTimeMS -= baselineTimeMS; + if (jobStartTimeMS < 0) { + LOG.warn("Warning: reset job {} start time to 0.", oldJobId); + jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; + jobStartTimeMS = 0; + } + + increaseQueueAppNum(jobQueue); + + List<ContainerSimulator> containerList = + new ArrayList<ContainerSimulator>(); + ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet()); + Random rand = new Random(stjp.getSeed()); + + Resource maxMapRes = Resource.newInstance(0, 0); + long maxMapDur = 0; + // map tasks + for (int i = 0; i < job.getNumberMaps(); i++) { + TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0); + RMNode node = nmMap + .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); + String hostname = "/" + node.getRackName() + "/" + node.getHostName(); + long containerLifeTime = tai.getRuntime(); + Resource containerResource = + Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), + (int) tai.getTaskInfo().getTaskVCores()); + containerList.add(new ContainerSimulator(containerResource, + containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); + maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource); + maxMapDur = + containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur; + + } + + Resource maxRedRes = Resource.newInstance(0, 0); + long maxRedDur = 0; + // reduce tasks + for (int i = 0; i < job.getNumberReduces(); i++) { + TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0); + RMNode node = nmMap + .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); + String hostname = "/" + node.getRackName() + "/" + node.getHostName(); + long containerLifeTime = tai.getRuntime(); + Resource containerResource = + Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), + (int) tai.getTaskInfo().getTaskVCores()); + containerList.add(new ContainerSimulator(containerResource, + containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); + maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource); + maxRedDur = + containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur; + + } + + // generating reservations for the jobs that require them + + ReservationSubmissionRequest rr = null; + if (job.hasDeadline()) { + ReservationId reservationId = + ReservationId.newInstance(this.rm.getStartTime(), AM_ID); + + rr = ReservationClientUtil.createMRReservation(reservationId, + "reservation_" + AM_ID, maxMapRes, job.getNumberMaps(), maxMapDur, + maxRedRes, job.getNumberReduces(), maxRedDur, + now + jobStartTimeMS, now + job.getDeadline(), + job.getQueueName()); + } - } finally { - reader.close(); + + runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, + jobStartTimeMS, jobFinishTimeMS, containerList, rr, + getAMContainerResource(null)); + } + } finally { + stjp.close(); + } + + } + + private Resource getAMContainerResource(Map jsonJob) { + Resource amContainerResource = + SLSConfiguration.getAMContainerResource(getConf()); + + if (jsonJob == null) { + return amContainerResource; + } + + if (jsonJob.containsKey("am.memory")) { + amContainerResource.setMemorySize( + Long.parseLong(jsonJob.get("am.memory").toString())); + } + + if (jsonJob.containsKey("am.vcores")) { + amContainerResource.setVirtualCores( + Integer.parseInt(jsonJob.get("am.vcores").toString())); + } + return amContainerResource; + } + + private void increaseQueueAppNum(String queue) throws YarnException { + SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); + String queueName = wrapper.getRealQueueName(queue); + Integer appNum = queueAppNumMap.get(queueName); + if (appNum == null) { + appNum = 1; + } else { + appNum++; + } + + queueAppNumMap.put(queueName, appNum); + } + + @SuppressWarnings("unchecked") + private void runNewAM(String jobType, String user, + String jobQueue, String oldJobId, long jobStartTimeMS, + long jobFinishTimeMS, List<ContainerSimulator> containerList, + ReservationSubmissionRequest rr, Resource amContainerResource) { + + AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( + amClassMap.get(jobType), new Configuration()); + + if (amSim != null) { + int heartbeatInterval = getConf().getInt( + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); + boolean isTracked = trackedApps.contains(oldJobId); + + if (oldJobId == null) { + oldJobId = Integer.toString(AM_ID); } + AM_ID++; + + amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, + jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, rr, + runner.getStartTimeMS(), amContainerResource); + runner.schedule(amSim); + maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); + numTasks += containerList.size(); + amMap.put(oldJobId, amSim); } } - + private void printSimulationInfo() { if (printSimulation) { // node @@ -468,7 +778,7 @@ public class SLSRunner { LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks"); for (Map.Entry<String, AMSimulator> entry : amMap.entrySet()) { AMSimulator am = entry.getValue(); - LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() + LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() + "\t" + am.getDuration() + "\t" + am.getNumTasks()); } LOG.info("------------------------------------"); @@ -502,69 +812,125 @@ public class SLSRunner { return nmMap; } - public static TaskRunner getRunner() { - return runner; - } - public static void decreaseRemainingApps() { - remainingApps --; + remainingApps--; if (remainingApps == 0) { LOG.info("SLSRunner tears down."); - System.exit(0); + if (exitAtTheFinish) { + System.exit(0); + } } } - public static void main(String args[]) throws Exception { + public void stop() throws InterruptedException { + rm.stop(); + runner.stop(); + } + + public int run(final String[] argv) throws IOException, InterruptedException, + ParseException, ClassNotFoundException, YarnException { + Options options = new Options(); + + // Left for compatibility options.addOption("inputrumen", true, "input rumen files"); options.addOption("inputsls", true, "input sls files"); + + // New more general format + options.addOption("tracetype", true, "the type of trace"); + options.addOption("tracelocation", true, "input trace files"); + options.addOption("nodes", true, "input topology"); options.addOption("output", true, "output directory"); options.addOption("trackjobs", true, - "jobs to be tracked during simulating"); + "jobs to be tracked during simulating"); options.addOption("printsimulation", false, - "print out simulation information"); - + "print out simulation information"); + CommandLineParser parser = new GnuParser(); - CommandLine cmd = parser.parse(options, args); + CommandLine cmd = parser.parse(options, argv); + + String traceType = null; + String traceLocation = null; + + // compatibility with old commandline + if (cmd.hasOption("inputrumen")) { + traceType = "RUMEN"; + traceLocation = cmd.getOptionValue("inputrumen"); + } + if (cmd.hasOption("inputsls")) { + traceType = "SLS"; + traceLocation = cmd.getOptionValue("inputsls"); + } + + if (cmd.hasOption("tracetype")) { + traceType = cmd.getOptionValue("tracetype"); + traceLocation = cmd.getOptionValue("tracelocation"); + } - String inputRumen = cmd.getOptionValue("inputrumen"); - String inputSLS = cmd.getOptionValue("inputsls"); String output = cmd.getOptionValue("output"); - - if ((inputRumen == null && inputSLS == null) || output == null) { - System.err.println(); - System.err.println("ERROR: Missing input or output file"); - System.err.println(); - System.err.println("Options: -inputrumen|-inputsls FILE,FILE... " + - "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " + - "[-printsimulation]"); - System.err.println(); - System.exit(1); - } - + File outputFile = new File(output); - if (! outputFile.exists() - && ! outputFile.mkdirs()) { + if (!outputFile.exists() && !outputFile.mkdirs()) { System.err.println("ERROR: Cannot create output directory " - + outputFile.getAbsolutePath()); - System.exit(1); + + outputFile.getAbsolutePath()); + throw new YarnException("Cannot create output directory"); } - + Set<String> trackedJobSet = new HashSet<String>(); if (cmd.hasOption("trackjobs")) { String trackjobs = cmd.getOptionValue("trackjobs"); String jobIds[] = trackjobs.split(","); trackedJobSet.addAll(Arrays.asList(jobIds)); } - - String nodeFile = cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : ""; - boolean isSLS = inputSLS != null; - String inputFiles[] = isSLS ? inputSLS.split(",") : inputRumen.split(","); - SLSRunner sls = new SLSRunner(isSLS, inputFiles, nodeFile, output, + String tempNodeFile = + cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : ""; + + TraceType tempTraceType = TraceType.SLS; + switch (traceType) { + case "SLS": + tempTraceType = TraceType.SLS; + break; + case "RUMEN": + tempTraceType = TraceType.RUMEN; + break; + + case "SYNTH": + tempTraceType = TraceType.SYNTH; + break; + default: + printUsage(); + throw new YarnException("Misconfigured input"); + } + + String[] inputFiles = traceLocation.split(","); + + setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output, trackedJobSet, cmd.hasOption("printsimulation")); - sls.start(); + + start(); + + return 0; } + + public static void main(String[] argv) throws Exception { + exitAtTheFinish = true; + ToolRunner.run(new Configuration(), new SLSRunner(), argv); + } + + static void printUsage() { + System.err.println(); + System.err.println("ERROR: Wrong tracetype"); + System.err.println(); + System.err.println( + "Options: -tracetype " + "SLS|RUMEN|SYNTH -tracelocation FILE,FILE... " + + "(deprecated alternative options --inputsls FILE, FILE,... " + + " | --inputrumen FILE,FILE,...)" + + "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " + + "[-printsimulation]"); + System.err.println(); + } + }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index e536cb6..6171154 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.sls.appmaster; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -34,25 +35,24 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords - .FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; - -import org.apache.hadoop.yarn.api.protocolrecords - .RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords - .RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -60,12 +60,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.util.Records; - import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.apache.hadoop.yarn.sls.SLSRunner; @@ -90,7 +86,7 @@ public abstract class AMSimulator extends TaskRunner.Task { RecordFactoryProvider.getRecordFactory(null); // response queue protected final BlockingQueue<AllocateResponse> responseQueue; - protected int RESPONSE_ID = 1; + private int responseId = 0; // user name protected String user; // queue name @@ -98,6 +94,7 @@ public abstract class AMSimulator extends TaskRunner.Task { // am type protected String amtype; // job start/end time + private long baselineTimeMS; protected long traceStartTimeMS; protected long traceFinishTimeMS; protected long simulateStartTimeMS; @@ -107,28 +104,41 @@ public abstract class AMSimulator extends TaskRunner.Task { // progress protected int totalContainers; protected int finishedContainers; + + // waiting for AM container + volatile boolean isAMContainerRunning = false; + volatile Container amContainer; - protected final Logger LOG = LoggerFactory.getLogger(AMSimulator.class); + private static final Logger LOG = LoggerFactory.getLogger(AMSimulator.class); + + private Resource amContainerResource; + + private ReservationSubmissionRequest reservationRequest; public AMSimulator() { - this.responseQueue = new LinkedBlockingQueue<AllocateResponse>(); + this.responseQueue = new LinkedBlockingQueue<>(); } - public void init(int id, int heartbeatInterval, - List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId) { - super.init(traceStartTime, traceStartTime + 1000000L * heartbeatInterval, - heartbeatInterval); - this.user = user; - this.rm = rm; - this.se = se; - this.user = user; - this.queue = queue; - this.oldAppId = oldAppId; - this.isTracked = isTracked; - this.traceStartTimeMS = traceStartTime; - this.traceFinishTimeMS = traceFinishTime; + @SuppressWarnings("checkstyle:parameternumber") + public void init(int heartbeatInterval, + List<ContainerSimulator> containerList, ResourceManager resourceManager, + SLSRunner slsRunnner, long startTime, long finishTime, String simUser, + String simQueue, boolean tracked, String oldApp, + ReservationSubmissionRequest rr, long baseTimeMS, + Resource amContainerResource) { + super.init(startTime, startTime + 1000000L * heartbeatInterval, + heartbeatInterval); + this.user = simUser; + this.rm = resourceManager; + this.se = slsRunnner; + this.queue = simQueue; + this.oldAppId = oldApp; + this.isTracked = tracked; + this.baselineTimeMS = baseTimeMS; + this.traceStartTimeMS = startTime; + this.traceFinishTimeMS = finishTime; + this.reservationRequest = rr; + this.amContainerResource = amContainerResource; } /** @@ -136,29 +146,66 @@ public abstract class AMSimulator extends TaskRunner.Task { */ @Override public void firstStep() throws Exception { - simulateStartTimeMS = System.currentTimeMillis() - - SLSRunner.getRunner().getStartTimeMS(); + simulateStartTimeMS = System.currentTimeMillis() - baselineTimeMS; - // submit application, waiting until ACCEPTED - submitApp(); + ReservationId reservationId = null; - // register application master - registerAM(); + // submit a reservation if one is required, exceptions naturally happen + // when the reservation does not fit, catch, log, and move on running job + // without reservation. + try { + reservationId = submitReservationWhenSpecified(); + } catch (UndeclaredThrowableException y) { + LOG.warn("Unable to place reservation: " + y.getMessage()); + } + + // submit application, waiting until ACCEPTED + submitApp(reservationId); // track app metrics trackApp(); } + public synchronized void notifyAMContainerLaunched(Container masterContainer) + throws Exception { + this.amContainer = masterContainer; + this.appAttemptId = masterContainer.getId().getApplicationAttemptId(); + registerAM(); + isAMContainerRunning = true; + } + + private ReservationId submitReservationWhenSpecified() + throws IOException, InterruptedException { + if (reservationRequest != null) { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws YarnException, IOException { + rm.getClientRMService().submitReservation(reservationRequest); + LOG.info("RESERVATION SUCCESSFULLY SUBMITTED " + + reservationRequest.getReservationId()); + return null; + + } + }); + return reservationRequest.getReservationId(); + } else { + return null; + } + } + @Override public void middleStep() throws Exception { - // process responses in the queue - processResponseQueue(); - - // send out request - sendContainerRequest(); - - // check whether finish - checkStop(); + if (isAMContainerRunning) { + // process responses in the queue + processResponseQueue(); + + // send out request + sendContainerRequest(); + + // check whether finish + checkStop(); + } } @Override @@ -168,6 +215,22 @@ public abstract class AMSimulator extends TaskRunner.Task { if (isTracked) { untrackApp(); } + + // Finish AM container + if (amContainer != null) { + LOG.info("AM container = {} reported to finish", amContainer.getId()); + se.getNmMap().get(amContainer.getNodeId()).cleanupContainer( + amContainer.getId()); + } else { + LOG.info("AM container is null"); + } + + if (null == appAttemptId) { + // If appAttemptId == null, AM is not launched from RM's perspective, so + // it's unnecessary to finish am as well + return; + } + // unregister application master final FinishApplicationMasterRequest finishAMRequest = recordFactory .newRecordInstance(FinishApplicationMasterRequest.class); @@ -187,13 +250,14 @@ public abstract class AMSimulator extends TaskRunner.Task { } }); - simulateFinishTimeMS = System.currentTimeMillis() - - SLSRunner.getRunner().getStartTimeMS(); + simulateFinishTimeMS = System.currentTimeMillis() - baselineTimeMS; // record job running information - ((SchedulerWrapper)rm.getResourceScheduler()) - .addAMRuntime(appId, - traceStartTimeMS, traceFinishTimeMS, - simulateStartTimeMS, simulateFinishTimeMS); + SchedulerMetrics schedulerMetrics = + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + if (schedulerMetrics != null) { + schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS, + simulateStartTimeMS, simulateFinishTimeMS); + } } protected ResourceRequest createResourceRequest( @@ -213,7 +277,7 @@ public abstract class AMSimulator extends TaskRunner.Task { List<ContainerId> toRelease) { AllocateRequest allocateRequest = recordFactory.newRecordInstance(AllocateRequest.class); - allocateRequest.setResponseId(RESPONSE_ID ++); + allocateRequest.setResponseId(responseId++); allocateRequest.setAskList(ask); allocateRequest.setReleaseList(toRelease); return allocateRequest; @@ -229,7 +293,7 @@ public abstract class AMSimulator extends TaskRunner.Task { protected abstract void checkStop(); - private void submitApp() + private void submitApp(ReservationId reservationId) throws YarnException, InterruptedException, IOException { // ask for new application GetNewApplicationRequest newAppRequest = @@ -249,14 +313,19 @@ public abstract class AMSimulator extends TaskRunner.Task { appSubContext.setPriority(Priority.newInstance(0)); ContainerLaunchContext conLauContext = Records.newRecord(ContainerLaunchContext.class); - conLauContext.setApplicationACLs( - new HashMap<ApplicationAccessType, String>()); + conLauContext + .setApplicationACLs(new HashMap<ApplicationAccessType, String>()); conLauContext.setCommands(new ArrayList<String>()); conLauContext.setEnvironment(new HashMap<String, String>()); conLauContext.setLocalResources(new HashMap<String, LocalResource>()); conLauContext.setServiceData(new HashMap<String, ByteBuffer>()); appSubContext.setAMContainerSpec(conLauContext); - appSubContext.setUnmanagedAM(true); + appSubContext.setResource(amContainerResource); + + if(reservationId != null) { + appSubContext.setReservationID(reservationId); + } + subAppRequest.setApplicationSubmissionContext(appSubContext); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); ugi.doAs(new PrivilegedExceptionAction<Object>() { @@ -267,22 +336,6 @@ public abstract class AMSimulator extends TaskRunner.Task { } }); LOG.info("Submit a new application {}", appId); - - // waiting until application ACCEPTED - RMApp app = rm.getRMContext().getRMApps().get(appId); - while(app.getState() != RMAppState.ACCEPTED) { - Thread.sleep(10); - } - - // Waiting until application attempt reach LAUNCHED - // "Unmanaged AM must register after AM attempt reaches LAUNCHED state" - this.appAttemptId = rm.getRMContext().getRMApps().get(appId) - .getCurrentAppAttempt().getAppAttemptId(); - RMAppAttempt rmAppAttempt = rm.getRMContext().getRMApps().get(appId) - .getCurrentAppAttempt(); - while (rmAppAttempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED) { - Thread.sleep(10); - } } private void registerAM() @@ -314,14 +367,20 @@ public abstract class AMSimulator extends TaskRunner.Task { private void trackApp() { if (isTracked) { - ((SchedulerWrapper) rm.getResourceScheduler()) - .addTrackedApp(appAttemptId, oldAppId); + SchedulerMetrics schedulerMetrics = + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + if (schedulerMetrics != null) { + schedulerMetrics.addTrackedApp(appId, oldAppId); + } } } public void untrackApp() { if (isTracked) { - ((SchedulerWrapper) rm.getResourceScheduler()) - .removeTrackedApp(appAttemptId, oldAppId); + SchedulerMetrics schedulerMetrics = + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + if (schedulerMetrics != null) { + schedulerMetrics.removeTrackedApp(oldAppId); + } } } @@ -332,26 +391,28 @@ public abstract class AMSimulator extends TaskRunner.Task { Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>(); ResourceRequest anyRequest = null; for (ContainerSimulator cs : csList) { - String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname()); - // check rack local - String rackname = rackHostNames[0]; - if (rackLocalRequestMap.containsKey(rackname)) { - rackLocalRequestMap.get(rackname).setNumContainers( - rackLocalRequestMap.get(rackname).getNumContainers() + 1); - } else { - ResourceRequest request = createResourceRequest( - cs.getResource(), rackname, priority, 1); - rackLocalRequestMap.put(rackname, request); - } - // check node local - String hostname = rackHostNames[1]; - if (nodeLocalRequestMap.containsKey(hostname)) { - nodeLocalRequestMap.get(hostname).setNumContainers( - nodeLocalRequestMap.get(hostname).getNumContainers() + 1); - } else { - ResourceRequest request = createResourceRequest( - cs.getResource(), hostname, priority, 1); - nodeLocalRequestMap.put(hostname, request); + if (cs.getHostname() != null) { + String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname()); + // check rack local + String rackname = "/" + rackHostNames[0]; + if (rackLocalRequestMap.containsKey(rackname)) { + rackLocalRequestMap.get(rackname).setNumContainers( + rackLocalRequestMap.get(rackname).getNumContainers() + 1); + } else { + ResourceRequest request = + createResourceRequest(cs.getResource(), rackname, priority, 1); + rackLocalRequestMap.put(rackname, request); + } + // check node local + String hostname = rackHostNames[1]; + if (nodeLocalRequestMap.containsKey(hostname)) { + nodeLocalRequestMap.get(hostname).setNumContainers( + nodeLocalRequestMap.get(hostname).getNumContainers() + 1); + } else { + ResourceRequest request = + createResourceRequest(cs.getResource(), hostname, priority, 1); + nodeLocalRequestMap.put(hostname, request); + } } // any if (anyRequest == null) { @@ -382,4 +443,12 @@ public abstract class AMSimulator extends TaskRunner.Task { public int getNumTasks() { return totalContainers; } + + public ApplicationId getApplicationId() { + return appId; + } + + public ApplicationAttemptId getApplicationAttemptId() { + return appAttemptId; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 5d005df..21bf054 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -32,7 +32,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -40,8 +42,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; - import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.SLSRunner; import org.slf4j.Logger; @@ -63,10 +63,10 @@ public class MRAMSimulator extends AMSimulator { private static final int PRIORITY_REDUCE = 10; private static final int PRIORITY_MAP = 20; - + // pending maps private LinkedList<ContainerSimulator> pendingMaps = - new LinkedList<ContainerSimulator>(); + new LinkedList<>(); // pending failed maps private LinkedList<ContainerSimulator> pendingFailedMaps = @@ -107,106 +107,55 @@ public class MRAMSimulator extends AMSimulator { private int mapTotal = 0; private int reduceFinished = 0; private int reduceTotal = 0; - // waiting for AM container - private boolean isAMContainerRunning = false; - private Container amContainer; + // finished private boolean isFinished = false; - // resource for AM container - private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024; - private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1; - public final Logger LOG = LoggerFactory.getLogger(MRAMSimulator.class); + private static final Logger LOG = + LoggerFactory.getLogger(MRAMSimulator.class); - public void init(int id, int heartbeatInterval, + @SuppressWarnings("checkstyle:parameternumber") + public void init(int heartbeatInterval, List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId) { - super.init(id, heartbeatInterval, containerList, rm, se, - traceStartTime, traceFinishTime, user, queue, - isTracked, oldAppId); + boolean isTracked, String oldAppId, ReservationSubmissionRequest rr, + long baselineStartTimeMS, Resource amContainerResource) { + super.init(heartbeatInterval, containerList, rm, se, + traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, + rr, baselineStartTimeMS, amContainerResource); amtype = "mapreduce"; // get map/reduce tasks for (ContainerSimulator cs : containerList) { if (cs.getType().equals("map")) { cs.setPriority(PRIORITY_MAP); - pendingMaps.add(cs); + allMaps.add(cs); } else if (cs.getType().equals("reduce")) { cs.setPriority(PRIORITY_REDUCE); - pendingReduces.add(cs); + allReduces.add(cs); } } - allMaps.addAll(pendingMaps); - allReduces.addAll(pendingReduces); - mapTotal = pendingMaps.size(); - reduceTotal = pendingReduces.size(); + + LOG.info("Added new job with {} mapper and {} reducers", + allMaps.size(), allReduces.size()); + + mapTotal = allMaps.size(); + reduceTotal = allReduces.size(); totalContainers = mapTotal + reduceTotal; } @Override - public void firstStep() throws Exception { - super.firstStep(); - - requestAMContainer(); - } - - /** - * send out request for AM container - */ - protected void requestAMContainer() - throws YarnException, IOException, InterruptedException { - List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); - ResourceRequest amRequest = createResourceRequest( - BuilderUtils.newResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, - MR_AM_CONTAINER_RESOURCE_VCORES), - ResourceRequest.ANY, 1, 1); - ask.add(amRequest); - LOG.debug("Application {} sends out allocate request for its AM", appId); - final AllocateRequest request = this.createAllocateRequest(ask); - - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps() - .get(appAttemptId.getApplicationId()) - .getRMAppAttempt(appAttemptId).getAMRMToken(); - ugi.addTokenIdentifier(token.decodeIdentifier()); - AllocateResponse response = ugi.doAs( - new PrivilegedExceptionAction<AllocateResponse>() { - @Override - public AllocateResponse run() throws Exception { - return rm.getApplicationMasterService().allocate(request); - } - }); - if (response != null) { - responseQueue.put(response); + public synchronized void notifyAMContainerLaunched(Container masterContainer) + throws Exception { + if (null != masterContainer) { + restart(); + super.notifyAMContainerLaunched(masterContainer); } } @Override @SuppressWarnings("unchecked") - protected void processResponseQueue() - throws InterruptedException, YarnException, IOException { - // Check whether receive the am container - if (!isAMContainerRunning) { - if (!responseQueue.isEmpty()) { - AllocateResponse response = responseQueue.take(); - if (response != null - && !response.getAllocatedContainers().isEmpty()) { - // Get AM container - Container container = response.getAllocatedContainers().get(0); - se.getNmMap().get(container.getNodeId()) - .addNewContainer(container, -1L); - // Start AM container - amContainer = container; - LOG.debug("Application {} starts its AM container ({}).", appId, - amContainer.getId()); - isAMContainerRunning = true; - } - } - return; - } - + protected void processResponseQueue() throws Exception { while (! responseQueue.isEmpty()) { AllocateResponse response = responseQueue.take(); @@ -227,11 +176,15 @@ public class MRAMSimulator extends AMSimulator { assignedReduces.remove(containerId); reduceFinished ++; finishedContainers ++; - } else { + } else if (amContainer.getId().equals(containerId)){ // am container released event isFinished = true; LOG.info("Application {} goes to finish.", appId); } + + if (mapFinished >= mapTotal && reduceFinished >= reduceTotal) { + lastStep(); + } } else { // container to be killed if (assignedMaps.containsKey(containerId)) { @@ -242,10 +195,9 @@ public class MRAMSimulator extends AMSimulator { LOG.debug("Application {} has one reducer killed ({}).", appId, containerId); pendingFailedReduces.add(assignedReduces.remove(containerId)); - } else { - LOG.info("Application {}'s AM is going to be killed." + - " Restarting...", appId); - restart(); + } else if (amContainer.getId().equals(containerId)){ + LOG.info("Application {}'s AM is " + + "going to be killed. Waiting for rescheduling...", appId); } } } @@ -253,11 +205,8 @@ public class MRAMSimulator extends AMSimulator { // check finished if (isAMContainerRunning && - (mapFinished == mapTotal) && - (reduceFinished == reduceTotal)) { - // to release the AM container - se.getNmMap().get(amContainer.getNodeId()) - .cleanupContainer(amContainer.getId()); + (mapFinished >= mapTotal) && + (reduceFinished >= reduceTotal)) { isAMContainerRunning = false; LOG.debug("Application {} sends out event to clean up" + " its AM container.", appId); @@ -291,21 +240,38 @@ public class MRAMSimulator extends AMSimulator { */ private void restart() throws YarnException, IOException, InterruptedException { - // clear - finishedContainers = 0; + // clear isFinished = false; - mapFinished = 0; - reduceFinished = 0; pendingFailedMaps.clear(); pendingMaps.clear(); pendingReduces.clear(); pendingFailedReduces.clear(); - pendingMaps.addAll(allMaps); - pendingReduces.addAll(pendingReduces); - isAMContainerRunning = false; + + // Only add totalMaps - finishedMaps + int added = 0; + for (ContainerSimulator cs : allMaps) { + if (added >= mapTotal - mapFinished) { + break; + } + pendingMaps.add(cs); + } + + // And same, only add totalReduces - finishedReduces + added = 0; + for (ContainerSimulator cs : allReduces) { + if (added >= reduceTotal - reduceFinished) { + break; + } + pendingReduces.add(cs); + } amContainer = null; - // resent am container request - requestAMContainer(); + } + + private List<ContainerSimulator> mergeLists(List<ContainerSimulator> left, List<ContainerSimulator> right) { + List<ContainerSimulator> list = new ArrayList<>(); + list.addAll(left); + list.addAll(right); + return list; } @Override @@ -317,42 +283,44 @@ public class MRAMSimulator extends AMSimulator { // send out request List<ResourceRequest> ask = null; - if (isAMContainerRunning) { - if (mapFinished != mapTotal) { - // map phase - if (! pendingMaps.isEmpty()) { - ask = packageRequests(pendingMaps, PRIORITY_MAP); - LOG.debug("Application {} sends out request for {} mappers.", - appId, pendingMaps.size()); - scheduledMaps.addAll(pendingMaps); - pendingMaps.clear(); - } else if (! pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) { - ask = packageRequests(pendingFailedMaps, PRIORITY_MAP); - LOG.debug("Application {} sends out requests for {} failed mappers.", - appId, pendingFailedMaps.size()); - scheduledMaps.addAll(pendingFailedMaps); - pendingFailedMaps.clear(); - } - } else if (reduceFinished != reduceTotal) { - // reduce phase - if (! pendingReduces.isEmpty()) { - ask = packageRequests(pendingReduces, PRIORITY_REDUCE); - LOG.debug("Application {} sends out requests for {} reducers.", - appId, pendingReduces.size()); - scheduledReduces.addAll(pendingReduces); - pendingReduces.clear(); - } else if (! pendingFailedReduces.isEmpty() - && scheduledReduces.isEmpty()) { - ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE); - LOG.debug("Application {} sends out request for {} failed reducers.", - appId, pendingFailedReduces.size()); - scheduledReduces.addAll(pendingFailedReduces); - pendingFailedReduces.clear(); - } + if (mapFinished != mapTotal) { + // map phase + if (!pendingMaps.isEmpty()) { + ask = packageRequests(mergeLists(pendingMaps, scheduledMaps), + PRIORITY_MAP); + LOG.debug("Application {} sends out request for {} mappers.", + appId, pendingMaps.size()); + scheduledMaps.addAll(pendingMaps); + pendingMaps.clear(); + } else if (!pendingFailedMaps.isEmpty()) { + ask = packageRequests(mergeLists(pendingFailedMaps, scheduledMaps), + PRIORITY_MAP); + LOG.debug("Application {} sends out requests for {} failed mappers.", + appId, pendingFailedMaps.size()); + scheduledMaps.addAll(pendingFailedMaps); + pendingFailedMaps.clear(); + } + } else if (reduceFinished != reduceTotal) { + // reduce phase + if (!pendingReduces.isEmpty()) { + ask = packageRequests(mergeLists(pendingReduces, scheduledReduces), + PRIORITY_REDUCE); + LOG.debug("Application {} sends out requests for {} reducers.", + appId, pendingReduces.size()); + scheduledReduces.addAll(pendingReduces); + pendingReduces.clear(); + } else if (!pendingFailedReduces.isEmpty()) { + ask = packageRequests(mergeLists(pendingFailedReduces, scheduledReduces), + PRIORITY_REDUCE); + LOG.debug("Application {} sends out request for {} failed reducers.", + appId, pendingFailedReduces.size()); + scheduledReduces.addAll(pendingFailedReduces); + pendingFailedReduces.clear(); } } + if (ask == null) { - ask = new ArrayList<ResourceRequest>(); + ask = new ArrayList<>(); } final AllocateRequest request = createAllocateRequest(ask); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index 8fd5b3f..038f202 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.sls.conf; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; @Private @Unstable @@ -62,6 +64,14 @@ public class SLSConfiguration { public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000; public static final String AM_TYPE = AM_PREFIX + "type."; + public static final String AM_CONTAINER_MEMORY = AM_PREFIX + + "container.memory"; + public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024; + + public static final String AM_CONTAINER_VCORES = AM_PREFIX + + "container.vcores"; + public static final int AM_CONTAINER_VCORES_DEFAULT = 1; + // container public static final String CONTAINER_PREFIX = PREFIX + "container."; public static final String CONTAINER_MEMORY_MB = CONTAINER_PREFIX @@ -70,4 +80,9 @@ public class SLSConfiguration { public static final String CONTAINER_VCORES = CONTAINER_PREFIX + "vcores"; public static final int CONTAINER_VCORES_DEFAULT = 1; + public static Resource getAMContainerResource(Configuration conf) { + return Resource.newInstance( + conf.getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT), + conf.getInt(AM_CONTAINER_VCORES, AM_CONTAINER_VCORES_DEFAULT)); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java index fb1c1f4..9197b1e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.sls.nodemanager; import java.io.IOException; -import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collections; import java.util.List; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java new file mode 100644 index 0000000..b4ffb61 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; + +import java.util.Map; + +public class MockAMLauncher extends ApplicationMasterLauncher + implements EventHandler<AMLauncherEvent> { + private static final Log LOG = LogFactory.getLog( + MockAMLauncher.class); + + Map<String, AMSimulator> amMap; + SLSRunner se; + + public MockAMLauncher(SLSRunner se, RMContext rmContext, + Map<String, AMSimulator> amMap) { + super(rmContext); + this.amMap = amMap; + this.se = se; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + // Do nothing + } + + @Override + protected void serviceStart() throws Exception { + // Do nothing + } + + @Override + protected void serviceStop() throws Exception { + // Do nothing + } + + private void setupAMRMToken(RMAppAttempt appAttempt) { + // Setup AMRMToken + Token<AMRMTokenIdentifier> amrmToken = + super.context.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttempt.getAppAttemptId()); + ((RMAppAttemptImpl) appAttempt).setAMRMToken(amrmToken); + } + + @Override + @SuppressWarnings("unchecked") + public void handle(AMLauncherEvent event) { + if (AMLauncherEventType.LAUNCH == event.getType()) { + ApplicationId appId = + event.getAppAttempt().getAppAttemptId().getApplicationId(); + + // find AMSimulator + for (AMSimulator ams : amMap.values()) { + if (ams.getApplicationId() != null && ams.getApplicationId().equals( + appId)) { + try { + Container amContainer = event.getAppAttempt().getMasterContainer(); + + setupAMRMToken(event.getAppAttempt()); + + // Notify RMAppAttempt to change state + super.context.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(), + RMAppAttemptEventType.LAUNCHED)); + + ams.notifyAMContainerLaunched( + event.getAppAttempt().getMasterContainer()); + LOG.info("Notify AM launcher launched:" + amContainer.getId()); + + se.getNmMap().get(amContainer.getNodeId()) + .addNewContainer(amContainer, 100000000L); + + return; + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + } + + throw new YarnRuntimeException( + "Didn't find any AMSimulator for applicationId=" + appId); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org