Author: llu Date: Fri Dec 21 22:40:37 2012 New Revision: 1425171 URL: http://svn.apache.org/viewvc?rev=1425171&view=rev Log: MAPREDUCE-4660. Update task placement policy for network topology with node group. (Junping Du via llu)
Added: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulableWithNodeGroup.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java hadoop/common/branches/branch-1/src/mapred/mapred-default.xml hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1425171&r1=1425170&r2=1425171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Fri Dec 21 22:40:37 2012 @@ -6,6 +6,9 @@ Release 1.2.0 - unreleased NEW FEATURES + MAPREDUCE-4660. Update task placement policy for network topology + with node group. (Junping Du via llu) + HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child hadoop client processes. (Yu Gao via llu) Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1425171&r1=1425170&r2=1425171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original) +++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Dec 21 22:40:37 2012 @@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.serve import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -327,8 +328,17 @@ public class FairScheduler extends TaskS public void jobAdded(JobInProgress job) { synchronized (FairScheduler.this) { eventLog.log("JOB_ADDED", job.getJobID()); - JobInfo info = new JobInfo(new JobSchedulable(FairScheduler.this, job, TaskType.MAP), - new JobSchedulable(FairScheduler.this, job, TaskType.REDUCE)); + JobSchedulable mapSched = ReflectionUtils.newInstance( + conf.getClass("mapred.jobtracker.jobSchedulable", JobSchedulable.class, + JobSchedulable.class), conf); + mapSched.init(FairScheduler.this, job, TaskType.MAP); + + JobSchedulable redSched = ReflectionUtils.newInstance( + conf.getClass("mapred.jobtracker.jobSchedulable", JobSchedulable.class, + JobSchedulable.class), conf); + redSched.init(FairScheduler.this, job, TaskType.REDUCE); + + JobInfo info = new JobInfo(mapSched, redSched); infos.put(job, info); poolMgr.addJob(job); // Also adds job into the right PoolScheduable update(); @@ -585,8 +595,10 @@ public class FairScheduler extends TaskS private void updateLastMapLocalityLevel(JobInProgress job, Task mapTaskLaunched, TaskTrackerStatus tracker) { JobInfo info = infos.get(job); + boolean isNodeGroupAware = conf.getBoolean( + "net.topology.nodegroup.aware", false); LocalityLevel localityLevel = LocalityLevel.fromTask( - job, mapTaskLaunched, tracker); + job, mapTaskLaunched, tracker, isNodeGroupAware); info.lastMapLocalityLevel = localityLevel; info.timeWaitedForLocalMap = 0; eventLog.log("ASSIGNED_LOC_LEVEL", job.getJobID(), localityLevel); Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=1425171&r1=1425170&r2=1425171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java (original) +++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java Fri Dec 21 22:40:37 2012 @@ -25,9 +25,9 @@ import org.apache.hadoop.mapred.FairSche import org.apache.hadoop.mapreduce.TaskType; public class JobSchedulable extends Schedulable { - private FairScheduler scheduler; - private JobInProgress job; - private TaskType taskType; + protected FairScheduler scheduler; + protected JobInProgress job; + protected TaskType taskType; private int demand = 0; public JobSchedulable(FairScheduler scheduler, JobInProgress job, @@ -38,6 +38,18 @@ public class JobSchedulable extends Sche initMetrics(); } + + public JobSchedulable() { + } + + public void init(FairScheduler scheduler, JobInProgress job, + TaskType taskType) { + this.scheduler = scheduler; + this.job = job; + this.taskType = taskType; + + initMetrics(); + } @Override public TaskType getTaskType() { @@ -87,7 +99,7 @@ public class JobSchedulable extends Sche } } - private boolean isRunnable() { + protected boolean isRunnable() { JobInfo info = scheduler.getJobInfo(job); int runState = job.getStatus().getRunState(); return (info != null && info.runnable && runState == JobStatus.RUNNING); Added: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulableWithNodeGroup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulableWithNodeGroup.java?rev=1425171&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulableWithNodeGroup.java (added) +++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulableWithNodeGroup.java Fri Dec 21 22:40:37 2012 @@ -0,0 +1,62 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.mapreduce.TaskType; + +public class JobSchedulableWithNodeGroup extends JobSchedulable { + + public JobSchedulableWithNodeGroup(FairScheduler scheduler, + JobInProgress job, TaskType taskType) { + super(scheduler, job, taskType); + } + + public JobSchedulableWithNodeGroup() { + } + + @Override + public Task assignTask(TaskTrackerStatus tts, long currentTime, + Collection<JobInProgress> visited) throws IOException { + if (isRunnable()) { + visited.add(job); + TaskTrackerManager ttm = scheduler.taskTrackerManager; + ClusterStatus clusterStatus = ttm.getClusterStatus(); + int numTaskTrackers = clusterStatus.getTaskTrackers(); + + // check with the load manager whether it is safe to + // launch this task on this taskTracker. + LoadManager loadMgr = scheduler.getLoadManager(); + if (!loadMgr.canLaunchTask(tts, job, taskType)) { + return null; + } + if (taskType == TaskType.MAP) { + LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel( + job, currentTime); + scheduler.getEventLog().log( + "ALLOWED_LOC_LEVEL", job.getJobID(), localityLevel); + switch (localityLevel) { + case NODE: + return job.obtainNewNodeLocalMapTask(tts, numTaskTrackers, + ttm.getNumberOfUniqueHosts()); + case NODEGROUP: + // locality level for nodegroup is 2 + return job.obtainNewMapTaskCommon(tts, numTaskTrackers, + ttm.getNumberOfUniqueHosts(), 2); + case RACK: + return job.obtainNewNodeOrRackLocalMapTask(tts, numTaskTrackers, + ttm.getNumberOfUniqueHosts()); + default: + return job.obtainNewMapTask(tts, numTaskTrackers, + ttm.getNumberOfUniqueHosts()); + } + } else { + return job.obtainNewReduceTask(tts, numTaskTrackers, + ttm.getNumberOfUniqueHosts()); + } + } else { + return null; + } + } + +} Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java?rev=1425171&r1=1425170&r2=1425171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java (original) +++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java Fri Dec 21 22:40:37 2012 @@ -23,13 +23,14 @@ package org.apache.hadoop.mapred; * is allowed to launch tasks. By default, jobs are not allowed to launch * non-data-local tasks until they have waited a small number of seconds to * find a slot on a node that they have data on. If a job has waited this - * long, it is allowed to launch rack-local tasks as well (on nodes that may - * not have the task's input data, but share a rack with a node that does). - * Finally, after a further wait, jobs are allowed to launch tasks anywhere - * in the cluster. + * long, it is allowed to launch other locality tasks as well, such as: + * nodegroup-local if the topology support nodegroup layer, rack-local (on + * nodes that may not have the task's input data, but share a rack with a node + * that does). Finally, after a further wait, jobs are allowed to launch tasks + * anywhere in the cluster. * - * This enum defines three levels - NODE, RACK and ANY (for allowing tasks - * to be launched on any node). A map task's level can be obtained from + * This enum defines four levels - NODE, NODEGROUP, RACK and ANY (for allowing + * tasks to be launched on any node). A map task's level can be obtained from * its job through {@link #fromTask(JobInProgress, Task, TaskTrackerStatus)}. In * addition, for any locality level, it is possible to get a "level cap" to pass * to {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)} @@ -37,16 +38,25 @@ package org.apache.hadoop.mapred; * the {@link #toCacheLevelCap()} method. */ public enum LocalityLevel { - NODE, RACK, ANY; + NODE, NODEGROUP, RACK, ANY; public static LocalityLevel fromTask(JobInProgress job, Task mapTask, - TaskTrackerStatus tracker) { + TaskTrackerStatus tracker, boolean isNodeGroupAware) { TaskID tipID = mapTask.getTaskID().getTaskID(); TaskInProgress tip = job.getTaskInProgress(tipID); - switch (job.getLocalityLevel(tip, tracker)) { - case 0: return LocalityLevel.NODE; - case 1: return LocalityLevel.RACK; - default: return LocalityLevel.ANY; + if (isNodeGroupAware) { + switch (job.getLocalityLevel(tip, tracker)) { + case 0: return LocalityLevel.NODE; + case 1: return LocalityLevel.NODEGROUP; + case 2: return LocalityLevel.RACK; + default: return LocalityLevel.ANY; + } + } else { + switch (job.getLocalityLevel(tip, tracker)) { + case 0: return LocalityLevel.NODE; + case 1: return LocalityLevel.RACK; + default: return LocalityLevel.ANY; + } } } @@ -55,11 +65,20 @@ public enum LocalityLevel { * {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)} * to ensure that only tasks of this locality level and lower are launched. */ - public int toCacheLevelCap() { - switch(this) { - case NODE: return 1; - case RACK: return 2; - default: return Integer.MAX_VALUE; + public int toCacheLevelCap(boolean isNodeGroupAware) { + if (isNodeGroupAware) { + switch(this) { + case NODE: return 1; + case NODEGROUP: return 2; + case RACK: return 3; + default: return Integer.MAX_VALUE; + } + } else { + switch(this) { + case NODE: return 1; + case RACK: return 2; + default: return Integer.MAX_VALUE; + } } } } Modified: hadoop/common/branches/branch-1/src/mapred/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/mapred-default.xml?rev=1425171&r1=1425170&r2=1425171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/mapred-default.xml (original) +++ hadoop/common/branches/branch-1/src/mapred/mapred-default.xml Fri Dec 21 22:40:37 2012 @@ -297,6 +297,20 @@ </property> <property> + <name>mapred.jobtracker.nodegroup.aware</name> + <value>false</value> + <description>Identify if jobtracker is aware of nodegroup layer.</description> +</property> + +<property> + <name>mapred.jobtracker.jobSchedulable</name> + <value>org.apache.hadoop.mapred.JobSchedulable</value> + <description>The class responsible for an entity in FairScheduler that can + launch tasks. + </description> +</property> + +<property> <name>mapred.jobtracker.taskScheduler.maxRunningTasksPerJob</name> <value></value> <description>The maximum number of running tasks for a job before Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1425171&r1=1425170&r2=1425171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Dec 21 22:40:37 2012 @@ -47,7 +47,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; import org.apache.hadoop.mapred.Counters.CountersExceededException; -import org.apache.hadoop.mapred.Counters.Group; import org.apache.hadoop.mapred.JobHistory.Values; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobSubmissionFiles; @@ -70,7 +69,7 @@ import org.apache.hadoop.util.StringUtil /************************************************************* * JobInProgress maintains all the info for keeping - * a Job on the straight and narrow. It keeps its JobProfile + * a Job on the straight and narrow. It keeps its JobProfile * and its latest JobStatus, plus a set of tables for * doing bookkeeping of its Tasks. * *********************************************************** @@ -255,7 +254,6 @@ public class JobInProgress { private String submitHostAddress; private String user; private String historyFile = ""; - private boolean historyFileCopied; // Per-job counters public static enum Counter { @@ -265,6 +263,7 @@ public class JobInProgress { TOTAL_LAUNCHED_REDUCES, OTHER_LOCAL_MAPS, DATA_LOCAL_MAPS, + NODEGROUP_LOCAL_MAPS, RACK_LOCAL_MAPS, SLOTS_MILLIS_MAPS, SLOTS_MILLIS_REDUCES, @@ -519,7 +518,7 @@ public class JobInProgress { public void cleanUpMetrics() { // per job metrics is disabled for now. } - + private void printCache (Map<Node, List<TaskInProgress>> cache) { LOG.info("The taskcache info:"); for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) { @@ -701,8 +700,8 @@ public class JobInProgress { TaskSplitMetaInfo[] splits = createSplits(jobId); if (numMapTasks != splits.length) { throw new IOException("Number of maps in JobConf doesn't match number of " + - "recieved splits for job " + jobId + "! " + - "numMapTasks=" + numMapTasks + ", #splits=" + splits.length); + "recieved splits for job " + jobId + "! " + + "numMapTasks=" + numMapTasks + ", #splits=" + splits.length); } numMapTasks = splits.length; @@ -1310,27 +1309,49 @@ public class JobInProgress { int clusterSize, int numUniqueHosts ) throws IOException { - if (status.getRunState() != JobStatus.RUNNING) { + return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, + anyCacheLevel); + } + + /** + * Return a MapTask with locality level that smaller or equal than a given + * locality level to tasktracker. + * + * @param tts The task tracker that is asking for a task + * @param clusterSize The number of task trackers in the cluster + * @param numUniqueHosts The number of hosts that run task trackers + * @param avgProgress The average progress of this kind of task in this job + * @param maxCacheLevel The maximum topology level until which to schedule + * maps. + * @return the index in tasks of the selected task (or -1 for no task) + * @throws IOException + */ + public synchronized Task obtainNewMapTaskCommon( + TaskTrackerStatus tts, int clusterSize, int numUniqueHosts, + int maxCacheLevel) throws IOException { + if (!tasksInited) { LOG.info("Cannot create task split for " + profile.getJobID()); try { throw new IOException("state = " + status.getRunState()); } catch (IOException ioe) {ioe.printStackTrace();} return null; } - - int target = findNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel, + + int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxCacheLevel, status.mapProgress()); if (target == -1) { return null; } - + Task result = maps[target].getTaskToRun(tts.getTrackerName()); if (result != null) { addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true); - resetSchedulingOpportunities(); + // DO NOT reset for off-switch! + if (maxCacheLevel != NON_LOCAL_CACHE_LEVEL) { + resetSchedulingOpportunities(); + } } - return result; - } + } /* * Return task cleanup attempt if any, to run on a given tracker @@ -1373,78 +1394,22 @@ public class JobInProgress { public synchronized Task obtainNewNodeLocalMapTask(TaskTrackerStatus tts, int clusterSize, int numUniqueHosts) - throws IOException { - if (!tasksInited) { - LOG.info("Cannot create task split for " + profile.getJobID()); - try { throw new IOException("state = " + status.getRunState()); } - catch (IOException ioe) {ioe.printStackTrace();} - return null; - } - - int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 1, - status.mapProgress()); - if (target == -1) { - return null; - } - - Task result = maps[target].getTaskToRun(tts.getTrackerName()); - if (result != null) { - addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true); - resetSchedulingOpportunities(); - } - - return result; + throws IOException { + return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, 1); } public synchronized Task obtainNewNodeOrRackLocalMapTask( TaskTrackerStatus tts, int clusterSize, int numUniqueHosts) throws IOException { - if (!tasksInited) { - LOG.info("Cannot create task split for " + profile.getJobID()); - try { throw new IOException("state = " + status.getRunState()); } - catch (IOException ioe) {ioe.printStackTrace();} - return null; - } - - int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, - status.mapProgress()); - if (target == -1) { - return null; - } - - Task result = maps[target].getTaskToRun(tts.getTrackerName()); - if (result != null) { - addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true); - resetSchedulingOpportunities(); - } - - return result; + return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, maxLevel); } public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts, int clusterSize, int numUniqueHosts) - throws IOException { - if (!tasksInited) { - LOG.info("Cannot create task split for " + profile.getJobID()); - try { throw new IOException("state = " + status.getRunState()); } - catch (IOException ioe) {ioe.printStackTrace();} - return null; - } - - int target = findNewMapTask(tts, clusterSize, numUniqueHosts, - NON_LOCAL_CACHE_LEVEL, status.mapProgress()); - if (target == -1) { - return null; - } - - Task result = maps[target].getTaskToRun(tts.getTrackerName()); - if (result != null) { - addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true); - // DO NOT reset for off-switch! - } - - return result; + throws IOException { + return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, + NON_LOCAL_CACHE_LEVEL); } public void schedulingOpportunity() { @@ -1749,7 +1714,7 @@ public class JobInProgress { // keeping the earlier ordering intact String name; String splits = ""; - Enum counter = null; + Enum<Counter> counter = null; if (tip.isJobSetupTask()) { launchedSetup = true; name = Values.SETUP.name(); @@ -1815,26 +1780,57 @@ public class JobInProgress { } } } - switch (level) { - case 0 : - LOG.info("Choosing data-local task " + tip.getTIPId()); - jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1); + logAndIncreJobCounters(tip, level, jobtracker.isNodeGroupAware()); + } + } + + private void logAndIncreJobCounters(TaskInProgress tip, int level, + boolean isNodeGroupAware) { + switch (level) { + case 0: + logAndIncrDataLocalMaps(tip); break; case 1: - LOG.info("Choosing rack-local task " + tip.getTIPId()); - jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1); + if (isNodeGroupAware) { + logAndIncrNodeGroupLocalMaps(tip); + } else { + logAndIncrRackLocalMaps(tip); + } break; - default : + case 2: + if (isNodeGroupAware) { + logAndIncrRackLocalMaps(tip); + } + break; + default: // check if there is any locality if (level != this.maxLevel) { - LOG.info("Choosing cached task at level " + level + tip.getTIPId()); - jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1); + logAndIncrOtherLocalMaps(tip, level); } break; - } } } + private void logAndIncrOtherLocalMaps(TaskInProgress tip, int level) { + LOG.info("Choosing cached task at level " + level + tip.getTIPId()); + jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1); + } + + private void logAndIncrNodeGroupLocalMaps(TaskInProgress tip) { + LOG.info("Choosing nodeGroup-local task " + tip.getTIPId()); + jobCounters.incrCounter(Counter.NODEGROUP_LOCAL_MAPS, 1); + } + + private void logAndIncrRackLocalMaps(TaskInProgress tip) { + LOG.info("Choosing rack-local task " + tip.getTIPId()); + jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1); + } + + private void logAndIncrDataLocalMaps(TaskInProgress tip) { + LOG.info("Choosing data-local task " + tip.getTIPId()); + jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1); + } + void setFirstTaskLaunchTime(TaskInProgress tip) { TaskType key = tip.getFirstTaskType(); Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties?rev=1425171&r1=1425170&r2=1425171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties Fri Dec 21 22:40:37 2012 @@ -22,6 +22,7 @@ TOTAL_LAUNCHED_REDUCES.name= Launc OTHER_LOCAL_MAPS.name= Other local map tasks DATA_LOCAL_MAPS.name= Data-local map tasks RACK_LOCAL_MAPS.name= Rack-local map tasks +NODEGROUP_LOCAL_MAPS.name= NodeGroup-local map tasks FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms) FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms) Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=1425171&r1=1425170&r2=1425171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Fri Dec 21 22:40:37 2012 @@ -172,7 +172,8 @@ class JobQueueTaskScheduler extends Task Task t = null; - // Try to schedule a node-local or rack-local Map task + // Try to schedule a Map task with locality between node-local + // and rack-local t = job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1425171&r1=1425170&r2=1425171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Dec 21 22:40:37 2012 @@ -204,8 +204,9 @@ public class JobTracker implements MRCon static final String JOB_INFO_FILE = "job-info"; static final String JOB_TOKEN_FILE = "jobToken"; private DNSToSwitchMapping dnsToSwitchMapping; - private NetworkTopology clusterMap = new NetworkTopology(); + private NetworkTopology clusterMap; private int numTaskCacheLevels; // the max level to which we cache tasks + private boolean isNodeGroupAware; /** * {@link #nodesAtMaxLevel} is using the keySet from {@link ConcurrentHashMap} * so that it can be safely written to and iterated on via 2 separate threads. @@ -1935,6 +1936,11 @@ public class JobTracker implements MRCon LOG.info("Starting jobtracker with owner as " + getMROwner().getShortUserName()); + // Create network topology + clusterMap = (NetworkTopology) ReflectionUtils.newInstance( + conf.getClass("net.topology.impl", NetworkTopology.class, + NetworkTopology.class), conf); + // Create the scheduler Class<? extends TaskScheduler> schedulerClass = conf.getClass("mapred.jobtracker.taskScheduler", @@ -2000,6 +2006,8 @@ public class JobTracker implements MRCon DNSToSwitchMapping.class), conf); this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", NetworkTopology.DEFAULT_HOST_LEVEL); + this.isNodeGroupAware = conf.getBoolean( + "mapred.jobtracker.nodegroup.aware", false); plugins = conf.getInstances("mapreduce.jobtracker.plugins", ServicePlugin.class); @@ -2821,7 +2829,9 @@ public class JobTracker implements MRCon public int getNumberOfUniqueHosts() { return uniqueHostsMap.size(); } - + public boolean isNodeGroupAware() { + return isNodeGroupAware; + } public void addJobInProgressListener(JobInProgressListener listener) { jobInProgressListeners.add(listener); } @@ -2971,7 +2981,7 @@ public class JobTracker implements MRCon } } } - + // Check for tasks to be killed List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName); if (killTasksList != null) { Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1425171&r1=1425170&r2=1425171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Dec 21 22:40:37 2012 @@ -56,7 +56,7 @@ public class MiniMRCluster { private String namenode; private UserGroupInformation ugi = null; - private JobConf conf; + protected JobConf conf; private int numTrackerToExclude; private JobConf job; Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java?rev=1425171&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java Fri Dec 21 22:40:37 2012 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.StaticMapping; + +public class MiniMRClusterWithNodeGroup extends MiniMRCluster { + + private String[] nodeGroups; + + public MiniMRClusterWithNodeGroup(int numTaskTrackers, String namenode, int numDir, + String[] racks, String[] nodeGroups, String[] hosts, JobConf conf) throws IOException { + super(numTaskTrackers, namenode, numDir, racks, hosts, conf); + this.nodeGroups = nodeGroups; + } + + /** + * Start the tasktracker. + */ + @Override + public void startTaskTracker(String host, String rack, + int idx, int numDir) throws IOException { + if (rack != null && nodeGroups != null) { + StaticMapping.addNodeToRack(host, rack + nodeGroups); + } + + if (host != null) { + NetUtils.addStaticResolution(host, "localhost"); + } + + TaskTrackerRunner taskTracker; + taskTracker = new TaskTrackerRunner(idx, numDir, host, conf); + + addTaskTracker(taskTracker); + } + +} Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java?rev=1425171&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java Fri Dec 21 22:40:37 2012 @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat; +import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.net.StaticMapping; +import org.junit.BeforeClass; + +public class TestNodeGroupAwareTaskPlacement extends TestCase { + + private static final String rack1[] = new String[] { + "/r1" + }; + private static final String nodeGroup1[] = new String[] { + "/nodegroup1" + }; + private static final String hosts1[] = new String[] { + "host1.nodegroup1.rack1" + }; + private static final String rack2[] = new String[] { + "/r1", "/r2" + }; + private static final String nodeGroup2[] = new String[] { + "/nodegroup2", "/nodegroup3" + }; + private static final String hosts2[] = new String[] { + "host2.nodegroup2.rack1", "host2.nodegroup3.rack2" + }; + private static final String hosts3[] = new String[] { + "host2.nodegroup3.rack2" + }; + private static final String nodeGroup3[] = new String[] { + "/nodegroup3" + }; + private static final String rack3[] = new String[] { + "/r2" + }; + private static final String hosts4[] = new String[] { + "host3.nodegroup1.rack1" + }; + private static final String nodeGroup4[] = new String[] { + "/nodegroup1" + }; + private static final String rack4[] = new String[] { + "/r1" + }; + final Path inDir = new Path("/nodegrouptesting"); + final Path outputPath = new Path("/output"); + + /** + * Launches a MR job and tests the job counters against the expected values. + * @param testName The name for the job + * @param mr The MR cluster + * @param fileSys The FileSystem + * @param in Input path + * @param out Output path + * @param numMaps Number of maps + * @param otherLocalMaps Expected value of other local maps + * @param rackLocalMaps Expected value of rack local maps + * @param nodeGroupLocalMaps Expected value of nodeGroup local maps + * @param dataLocalMaps Expected value of data(node) local maps + * @param jobConfig Configuration for running job + */ + static void launchJobAndTestCounters(String jobName, MiniMRCluster mr, + FileSystem fileSys, Path in, Path out, + int numMaps, int otherLocalMaps, + int rackLocalMaps, int nodeGroupLocalMaps, + int dataLocalMaps, JobConf jobConfig) + throws IOException { + JobConf jobConf = mr.createJobConf(jobConfig); + if (fileSys.exists(out)) { + fileSys.delete(out, true); + } + RunningJob job = launchJob(jobConf, in, out, numMaps, jobName); + Counters counters = job.getCounters(); + assertEquals("Number of local maps", + counters.getCounter(JobInProgress.Counter.OTHER_LOCAL_MAPS), otherLocalMaps); + assertEquals("Number of Data-local maps", + counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS), + dataLocalMaps); + assertEquals("Number of NodeGroup-local maps", + counters.getCounter(JobInProgress.Counter.NODEGROUP_LOCAL_MAPS), + nodeGroupLocalMaps); + assertEquals("Number of Rack-local maps", + counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), + rackLocalMaps); + + mr.waitUntilIdle(); + mr.shutdown(); + } + + @BeforeClass + public void setUp(){ + // map host to related locations + StaticMapping.addNodeToRack(hosts1[0], rack1[0]+nodeGroup1[0]); + StaticMapping.addNodeToRack(hosts2[0], rack2[0]+nodeGroup2[0]); + StaticMapping.addNodeToRack(hosts2[1], rack2[1]+nodeGroup2[1]); + StaticMapping.addNodeToRack(hosts4[0], rack4[0]+nodeGroup4[0]); + } + + public void testTaskPlacement() throws IOException { + String namenode = null; + MiniDFSClusterWithNodeGroup dfs = null; + MiniMRCluster mr = null; + FileSystem fileSys = null; + String testName = "TestForNodeGroupAwareness"; + try { + final int taskTrackers = 1; + + /* Start 4 datanodes, two in rack r1/nodegroup1, one in r1/nodegroup2 and + * the other one in r2/nodegroup3. Create three + * files (splits). + * 1) file1, just after starting the datanode on r1/nodegroup1, with + * a repl factor of 1, and, + * 2) file2 & file3 after starting the two datanodes in r1/nodegroup2 and + * r2/nodegroup3, with a repl factor of 3. + * 3) start the last data node (datanode4) in r1/nodegroup1 + * At the end, file1 will be present on only datanode1, and, file2 and + * file3, will be present on all datanodes except datanode4. + */ + Configuration conf = new Configuration(); + conf.setBoolean("dfs.replication.considerLoad", false); + + conf.set("dfs.block.replicator.classname", + "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup"); + + conf.set("net.topology.impl", + "org.apache.hadoop.net.NetworkTopologyWithNodeGroup"); + + conf.setBoolean("net.topology.nodegroup.aware", true); + + conf.setBoolean("mapred.jobtracker.nodegroup.aware", true); + conf.setInt("mapred.task.cache.levels", 3); + + conf.set("mapred.jobtracker.jobSchedulable", + "org.apache.hadoop.mapred.JobSchedulableWithNodeGroup"); + + JobConf jobConf = new JobConf(conf); + + MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroup1); + // start the dfs cluster with datanode1 only. + dfs = new MiniDFSClusterWithNodeGroup(0, conf, 1, + true, true, null, rack1, hosts1, null); + + dfs.waitActive(); + fileSys = dfs.getFileSystem(); + if (!fileSys.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + // write file1 on datanode1 with 1 replica + UtilsForTests.writeFile( + dfs.getNameNode(), conf, new Path(inDir + "/file1"), (short)1); + // start another two datanodes (2 and 3) + dfs.startDataNodes(conf, 2, true, null, rack2, nodeGroup2, hosts2, null); + + dfs.waitActive(); + // write two files with 3 replica, so each datanodes will have one replica + // of file2 and file3 + UtilsForTests.writeFile( + dfs.getNameNode(), conf, new Path(inDir + "/file2"), (short)3); + UtilsForTests.writeFile( + dfs.getNameNode(), conf, new Path(inDir + "/file3"), (short)3); + + namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + + (dfs.getFileSystem()).getUri().getPort(); + /* Run a job with the (only)tasktracker which is under r2/nodegroup3 and + * check the task placement that how many data/nodegroup/rack local maps + * it runs. The hostname of the tasktracker is set to same as datanode3. + */ + mr = new MiniMRClusterWithNodeGroup(taskTrackers, namenode, 1, rack3, + nodeGroup3, hosts3, jobConf); + /* The job is configured with three maps since there are three + * (non-splittable) files. On rack2, there are two files and both + * have repl of three. The blocks for those files must therefore be + * present on all the datanodes (except datanode4), in particular, + * the datanode3 on rack2. The third input file is pulled from rack1, + * thus the result should be 2 rack-local maps. + */ + launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0, + 0, 0, 2, jobConf); + mr.shutdown(); + + /* Run a job with the (only)tasktracker on datanode4. + */ + mr = new MiniMRClusterWithNodeGroup(taskTrackers, namenode, 1, rack4, + nodeGroup4, hosts4, jobConf); + + /* The job is configured with three maps since there are three + * (non-splittable) files. As the way in which repl was setup while + * creating the files, we will have all the three files on datanode1 which + * is on the same nodegroup with datanode4 where the only tasktracker run. + * Thus, the result should be 3 nodegroup-local maps. + * The MapReduce cluster have only 1 node which is host4 but no datanode + * running on that host. So this is to verify that in compute/data node + * separation case, it still can get nodegroup level locality in task + * scheduling. + */ + launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0, + 0, 3, 0, jobConf); + mr.shutdown(); + } finally { + if (dfs != null) { + dfs.shutdown(); + } + if (mr != null) { + mr.shutdown(); + } + } + } + + static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath, + int numMaps, String jobName) throws IOException { + jobConf.setJobName(jobName); + jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class); + jobConf.setOutputFormat(SequenceFileOutputFormat.class); + FileInputFormat.setInputPaths(jobConf, inDir); + FileOutputFormat.setOutputPath(jobConf, outputPath); + jobConf.setMapperClass(IdentityMapper.class); + jobConf.setReducerClass(IdentityReducer.class); + jobConf.setOutputKeyClass(BytesWritable.class); + jobConf.setOutputValueClass(BytesWritable.class); + jobConf.setNumMapTasks(numMaps); + jobConf.setNumReduceTasks(0); + jobConf.setJar("build/test/testjar/testjob.jar"); + return JobClient.runJob(jobConf); + } +}