Author: sseth Date: Tue Jan 29 00:23:27 2013 New Revision: 1439715 URL: http://svn.apache.org/viewvc?rev=1439715&view=rev Log: merge MAPREDUCE-4838 from trunk. Add additional fields like Locality, Avataar to the JobHistory logs. Contributed by Zhijie Shen
Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Avataar.java - copied unchanged from r1439714, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Avataar.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Locality.java - copied unchanged from r1439714, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Locality.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Jan 29 00:23:27 2013 @@ -58,6 +58,9 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4809. Change visibility of classes for pluggable sort changes. (masokan via tucu) + MAPREDUCE-4838. Add additional fields like Locality, Avataar to the + JobHistory logs. (Zhijie Shen via sseth) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Jan 29 00:23:27 2013 @@ -28,6 +28,7 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -1192,6 +1193,39 @@ public class JobImpl implements org.apac } } */ + /** + * Get the workflow adjacencies from the job conf + * The string returned is of the form "key"="value" "key"="value" ... + */ + private static String getWorkflowAdjacencies(Configuration conf) { + int prefixLen = MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING.length(); + Map<String,String> adjacencies = + conf.getValByRegex(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN); + if (adjacencies.isEmpty()) { + return ""; + } + int size = 0; + for (Entry<String,String> entry : adjacencies.entrySet()) { + int keyLen = entry.getKey().length(); + size += keyLen - prefixLen; + size += entry.getValue().length() + 6; + } + StringBuilder sb = new StringBuilder(size); + for (Entry<String,String> entry : adjacencies.entrySet()) { + int keyLen = entry.getKey().length(); + sb.append("\""); + sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen))); + sb.append("\"=\""); + sb.append(escapeString(entry.getValue())); + sb.append("\" "); + } + return sb.toString(); + } + + public static String escapeString(String data) { + return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR, + new char[] {'"', '=', '.'}); + } public static class InitTransition implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> { @@ -1217,7 +1251,11 @@ public class JobImpl implements org.apac job.conf.get(MRJobConfig.USER_NAME, "mapred"), job.appSubmitTime, job.remoteJobConfFile.toString(), - job.jobACLs, job.queueName); + job.jobACLs, job.queueName, + job.conf.get(MRJobConfig.WORKFLOW_ID, ""), + job.conf.get(MRJobConfig.WORKFLOW_NAME, ""), + job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""), + getWorkflowAdjacencies(job.conf)); job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); //TODO JH Verify jobACLs, UserName via UGI? Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Jan 29 00:23:27 2013 @@ -66,6 +66,8 @@ import org.apache.hadoop.mapreduce.jobhi import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.api.records.Avataar; +import org.apache.hadoop.mapreduce.v2.api.records.Locality; import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; @@ -156,7 +158,8 @@ public abstract class TaskAttemptImpl im private final org.apache.hadoop.mapred.JobID oldJobId; private final TaskAttemptListener taskAttemptListener; private final Resource resourceCapability; - private final String[] dataLocalHosts; + protected Set<String> dataLocalHosts; + protected Set<String> dataLocalRacks; private final List<String> diagnostics = new ArrayList<String>(); private final Lock readLock; private final Lock writeLock; @@ -175,6 +178,8 @@ public abstract class TaskAttemptImpl im private int shufflePort = -1; private String trackerName; private int httpPort; + private Locality locality; + private Avataar avataar; private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition(); @@ -532,8 +537,16 @@ public abstract class TaskAttemptImpl im getMemoryRequired(conf, taskId.getTaskType())); this.resourceCapability.setVirtualCores( getCpuRequired(conf, taskId.getTaskType())); - this.dataLocalHosts = dataLocalHosts; + + this.dataLocalHosts = resolveHosts(dataLocalHosts); RackResolver.init(conf); + this.dataLocalRacks = new HashSet<String>(); + for (String host : this.dataLocalHosts) { + this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation()); + } + + locality = Locality.OFF_SWITCH; + avataar = Avataar.VIRGIN; // This "this leak" is okay because the retained pointer is in an // instance variable. @@ -1032,6 +1045,23 @@ public abstract class TaskAttemptImpl im } } + public Locality getLocality() { + return locality; + } + + public void setLocality(Locality locality) { + this.locality = locality; + } + + public Avataar getAvataar() + { + return avataar; + } + + public void setAvataar(Avataar avataar) { + this.avataar = avataar; + } + private static TaskAttemptState getExternalState( TaskAttemptStateInternal smState) { switch (smState) { @@ -1232,25 +1262,27 @@ public abstract class TaskAttemptImpl im taskAttempt.attemptId, taskAttempt.resourceCapability)); } else { - Set<String> racks = new HashSet<String>(); - for (String host : taskAttempt.dataLocalHosts) { - racks.add(RackResolver.resolve(host).getNetworkLocation()); - } taskAttempt.eventHandler.handle(new ContainerRequestEvent( - taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt - .resolveHosts(taskAttempt.dataLocalHosts), racks - .toArray(new String[racks.size()]))); + taskAttempt.attemptId, taskAttempt.resourceCapability, + taskAttempt.dataLocalHosts.toArray( + new String[taskAttempt.dataLocalHosts.size()]), + taskAttempt.dataLocalRacks.toArray( + new String[taskAttempt.dataLocalRacks.size()]))); } } } - protected String[] resolveHosts(String[] src) { - String[] result = new String[src.length]; - for (int i = 0; i < src.length; i++) { - if (isIP(src[i])) { - result[i] = resolveHost(src[i]); - } else { - result[i] = src[i]; + protected Set<String> resolveHosts(String[] src) { + Set<String> result = new HashSet<String>(); + if (src != null) { + for (int i = 0; i < src.length; i++) { + if (src[i] == null) { + continue; + } else if (isIP(src[i])) { + result.add(resolveHost(src[i])); + } else { + result.add(src[i]); + } } } return result; @@ -1300,6 +1332,20 @@ public abstract class TaskAttemptImpl im taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId()); taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); + + taskAttempt.locality = Locality.OFF_SWITCH; + if (taskAttempt.dataLocalHosts.size() > 0) { + String cHost = taskAttempt.resolveHost( + taskAttempt.containerNodeId.getHost()); + if (taskAttempt.dataLocalHosts.contains(cHost)) { + taskAttempt.locality = Locality.NODE_LOCAL; + } + } + if (taskAttempt.locality == Locality.OFF_SWITCH) { + if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) { + taskAttempt.locality = Locality.RACK_LOCAL; + } + } //launch the container //create the container object to be launched for a given Task attempt @@ -1376,7 +1422,7 @@ public abstract class TaskAttemptImpl im taskAttempt.attemptId.getTaskId().getJobId(), tauce)); } else { LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); + "generated for taskAttempt: " + taskAttempt.getID()); } } } @@ -1421,7 +1467,8 @@ public abstract class TaskAttemptImpl im TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), taskAttempt.launchTime, nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), - taskAttempt.shufflePort, taskAttempt.containerID); + taskAttempt.shufflePort, taskAttempt.containerID, + taskAttempt.locality.toString(), taskAttempt.avataar.toString()); taskAttempt.eventHandler.handle (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase)); taskAttempt.eventHandler.handle @@ -1510,7 +1557,7 @@ public abstract class TaskAttemptImpl im // handling failed map/reduce events. }else { LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); + "generated for taskAttempt: " + taskAttempt.getID()); } taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); @@ -1580,7 +1627,7 @@ public abstract class TaskAttemptImpl im taskAttempt.attemptId.getTaskId().getJobId(), tauce)); }else { LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); + "generated for taskAttempt: " + taskAttempt.getID()); } taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); @@ -1648,7 +1695,7 @@ public abstract class TaskAttemptImpl im taskAttempt.attemptId.getTaskId().getJobId(), tauce)); }else { LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); + "generated for taskAttempt: " + taskAttempt.getID()); } // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure. taskAttempt.eventHandler.handle(new TaskTAttemptEvent( Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Jan 29 00:23:27 2013 @@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.jobhi import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.api.records.Avataar; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; @@ -594,8 +595,9 @@ public abstract class TaskImpl implement } // This is always called in the Write Lock - private void addAndScheduleAttempt() { + private void addAndScheduleAttempt(Avataar avataar) { TaskAttempt attempt = createAttempt(); + ((TaskAttemptImpl) attempt).setAvataar(avataar); if (LOG.isDebugEnabled()) { LOG.debug("Created attempt " + attempt.getID()); } @@ -749,7 +751,7 @@ public abstract class TaskImpl implement @Override public void transition(TaskImpl task, TaskEvent event) { - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(Avataar.VIRGIN); task.scheduledTime = task.clock.getTime(); TaskStartedEvent tse = new TaskStartedEvent( TypeConverter.fromYarn(task.taskId), task.getLaunchTime(), @@ -772,7 +774,7 @@ public abstract class TaskImpl implement @Override public void transition(TaskImpl task, TaskEvent event) { LOG.info("Scheduling a redundant attempt for task " + task.taskId); - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(Avataar.SPECULATIVE); } } @@ -849,7 +851,7 @@ public abstract class TaskImpl implement task.finishedAttempts.add(taskAttemptId); task.inProgressAttempts.remove(taskAttemptId); if (task.successfulAttempt == null) { - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(Avataar.VIRGIN); } } } @@ -937,7 +939,7 @@ public abstract class TaskImpl implement task.inProgressAttempts.remove(taskAttemptId); if (task.inProgressAttempts.size() == 0 && task.successfulAttempt == null) { - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(Avataar.VIRGIN); } } else { task.handleTaskAttemptCompletion( @@ -1053,7 +1055,7 @@ public abstract class TaskImpl implement // from the map splitInfo. So the bad node might be sent as a location // to the RM. But the RM would ignore that just like it would ignore // currently pending container requests affinitized to bad nodes. - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(Avataar.VIRGIN); return TaskStateInternal.SCHEDULED; } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Jan 29 00:23:27 2013 @@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Jan 29 00:23:27 2013 @@ -33,6 +33,9 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRConfig; @@ -66,6 +69,7 @@ import org.apache.hadoop.yarn.SystemCloc import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -105,6 +109,13 @@ public class TestJobImpl { Configuration conf = new Configuration(); conf.setInt(MRJobConfig.NUM_REDUCES, 0); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.set(MRJobConfig.WORKFLOW_ID, "testId"); + conf.set(MRJobConfig.WORKFLOW_NAME, "testName"); + conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName"); + conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1"); + conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2"); + + AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); @@ -114,6 +125,9 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); + JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId", + "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" "); + dispatcher.register(EventType.class, jseHandler); JobImpl job = createStubbedJob(conf, dispatcher, 0); job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); @@ -121,6 +135,11 @@ public class TestJobImpl { assertJobState(job, JobStateInternal.SUCCEEDED); dispatcher.stop(); commitHandler.stop(); + try { + Assert.assertTrue(jseHandler.getAssertValue()); + } catch (InterruptedException e) { + Assert.fail("Workflow related attributes are not tested properly"); + } } @Test(timeout=20000) @@ -614,6 +633,67 @@ public class TestJobImpl { Assert.assertEquals(state, job.getInternalState()); } + private static class JobSubmittedEventHandler implements + EventHandler<JobHistoryEvent> { + + private String workflowId; + + private String workflowName; + + private String workflowNodeName; + + private String workflowAdjacencies; + + private Boolean assertBoolean; + + public JobSubmittedEventHandler(String workflowId, String workflowName, + String workflowNodeName, String workflowAdjacencies) { + this.workflowId = workflowId; + this.workflowName = workflowName; + this.workflowNodeName = workflowNodeName; + this.workflowAdjacencies = workflowAdjacencies; + assertBoolean = null; + } + + @Override + public void handle(JobHistoryEvent jhEvent) { + if (jhEvent.getType() != EventType.JOB_SUBMITTED) { + return; + } + JobSubmittedEvent jsEvent = (JobSubmittedEvent) jhEvent.getHistoryEvent(); + if (!workflowId.equals(jsEvent.getWorkflowId())) { + setAssertValue(false); + return; + } + if (!workflowName.equals(jsEvent.getWorkflowName())) { + setAssertValue(false); + return; + } + if (!workflowNodeName.equals(jsEvent.getWorkflowNodeName())) { + setAssertValue(false); + return; + } + if (!workflowAdjacencies.equals(jsEvent.getWorkflowAdjacencies())) { + setAssertValue(false); + return; + } + setAssertValue(true); + } + + private synchronized void setAssertValue(Boolean bool) { + assertBoolean = bool; + notify(); + } + + public synchronized boolean getAssertValue() throws InterruptedException { + while (assertBoolean == null) { + wait(); + } + return assertBoolean; + } + + } + private static class StubbedJob extends JobImpl { //override the init transition private final InitTransition initTransition; Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Tue Jan 29 00:23:27 2013 @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.jobhi import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.Locality; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; @@ -157,6 +158,7 @@ public class TestTaskAttempt{ createMapTaskAttemptImplForTest(eventHandler, splitInfo); TaskAttemptImpl spyTa = spy(mockTaskAttempt); when(spyTa.resolveHost(hosts[0])).thenReturn("host1"); + spyTa.dataLocalHosts = spyTa.resolveHosts(splitInfo.getLocations()); TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class); rct.transition(spyTa, mockTAEvent); @@ -360,6 +362,8 @@ public class TestTaskAttempt{ taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); assertFalse(eventHandler.internalError); + assertEquals("Task attempt is not assigned on the local node", + Locality.NODE_LOCAL, taImpl.getLocality()); } @Test @@ -398,7 +402,7 @@ public class TestTaskAttempt{ mock(Token.class), new Credentials(), new SystemClock(), appCtx); - NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + NodeId nid = BuilderUtils.newNodeId("127.0.0.2", 0); ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -416,6 +420,8 @@ public class TestTaskAttempt{ TaskAttemptEventType.TA_CONTAINER_CLEANED)); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); + assertEquals("Task attempt is not assigned on the local rack", + Locality.RACK_LOCAL, taImpl.getLocality()); } @Test @@ -439,7 +445,7 @@ public class TestTaskAttempt{ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); - when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + when(splits.getLocations()).thenReturn(new String[] {}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); @@ -475,6 +481,8 @@ public class TestTaskAttempt{ TaskAttemptEventType.TA_CONTAINER_CLEANED)); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); + assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH, + taImpl.getLocality()); } @Test Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Tue Jan 29 00:23:27 2013 @@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.TaskUmbi import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; +import org.apache.hadoop.mapreduce.v2.api.records.Avataar; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; @@ -46,10 +47,12 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; @@ -254,6 +257,7 @@ public class TestTaskImpl { mockTask.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE)); assertTaskScheduledState(); + assertTaskAttemptAvataar(Avataar.VIRGIN); } private void killTask(TaskId taskId) { @@ -338,6 +342,19 @@ public class TestTaskImpl { private void assertTaskSucceededState() { assertEquals(TaskState.SUCCEEDED, mockTask.getState()); } + + /** + * {@link Avataar} + */ + private void assertTaskAttemptAvataar(Avataar avataar) { + for (TaskAttempt taskAttempt : mockTask.getAttempts().values()) { + if (((TaskAttemptImpl) taskAttempt).getAvataar() == avataar) { + return; + } + } + fail("There is no " + (avataar == Avataar.VIRGIN ? "virgin" : "speculative") + + "task attempt"); + } @Test public void testInit() { @@ -516,6 +533,9 @@ public class TestTaskImpl { // The task should still be in the succeeded state assertTaskSucceededState(); + + // The task should contain speculative a task attempt + assertTaskAttemptAvataar(Avataar.SPECULATIVE); } @Test Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Tue Jan 29 00:23:27 2013 @@ -91,7 +91,11 @@ "values": "string" } }, - {"name": "jobQueueName", "type": "string"} + {"name": "jobQueueName", "type": "string"}, + {"name": "workflowId", "type": "string"}, + {"name": "workflowName", "type": "string"}, + {"name": "workflowNodeName", "type": "string"}, + {"name": "workflowAdjacencies", "type": "string"} ] }, @@ -191,7 +195,9 @@ {"name": "trackerName", "type": "string"}, {"name": "httpPort", "type": "int"}, {"name": "shufflePort", "type": "int"}, - {"name": "containerId", "type": "string"} + {"name": "containerId", "type": "string"}, + {"name": "locality", "type": "string"}, + {"name": "avataar", "type": "string"} ] }, Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Jan 29 00:23:27 2013 @@ -647,5 +647,18 @@ public interface MRJobConfig { "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*", "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*", }; + + public static final String WORKFLOW_ID = "mapreduce.workflow.id"; + + public static final String WORKFLOW_NAME = "mapreduce.workflow.name"; + + public static final String WORKFLOW_NODE_NAME = + "mapreduce.workflow.node.name"; + + public static final String WORKFLOW_ADJACENCY_PREFIX_STRING = + "mapreduce.workflow.adjacency."; + + public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN = + "^mapreduce\\.workflow\\.adjacency\\..+"; } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java Tue Jan 29 00:23:27 2013 @@ -52,6 +52,29 @@ public class JobSubmittedEvent implement public JobSubmittedEvent(JobID id, String jobName, String userName, long submitTime, String jobConfPath, Map<JobACL, AccessControlList> jobACLs, String jobQueueName) { + this(id, jobName, userName, submitTime, jobConfPath, jobACLs, + jobQueueName, "", "", "", ""); + } + + /** + * Create an event to record job submission + * @param id The job Id of the job + * @param jobName Name of the job + * @param userName Name of the user who submitted the job + * @param submitTime Time of submission + * @param jobConfPath Path of the Job Configuration file + * @param jobACLs The configured acls for the job. + * @param jobQueueName The job-queue to which this job was submitted to + * @param workflowId The Id of the workflow + * @param workflowName The name of the workflow + * @param workflowNodeName The node name of the workflow + * @param workflowAdjacencies The adjacencies of the workflow + */ + public JobSubmittedEvent(JobID id, String jobName, String userName, + long submitTime, String jobConfPath, + Map<JobACL, AccessControlList> jobACLs, String jobQueueName, + String workflowId, String workflowName, String workflowNodeName, + String workflowAdjacencies) { datum.jobid = new Utf8(id.toString()); datum.jobName = new Utf8(jobName); datum.userName = new Utf8(userName); @@ -66,6 +89,18 @@ public class JobSubmittedEvent implement if (jobQueueName != null) { datum.jobQueueName = new Utf8(jobQueueName); } + if (workflowId != null) { + datum.workflowId = new Utf8(workflowId); + } + if (workflowName != null) { + datum.workflowName = new Utf8(workflowName); + } + if (workflowNodeName != null) { + datum.workflowNodeName = new Utf8(workflowNodeName); + } + if (workflowAdjacencies != null) { + datum.workflowAdjacencies = new Utf8(workflowAdjacencies); + } } JobSubmittedEvent() {} @@ -105,6 +140,34 @@ public class JobSubmittedEvent implement } return jobAcls; } + /** Get the id of the workflow */ + public String getWorkflowId() { + if (datum.workflowId != null) { + return datum.workflowId.toString(); + } + return null; + } + /** Get the name of the workflow */ + public String getWorkflowName() { + if (datum.workflowName != null) { + return datum.workflowName.toString(); + } + return null; + } + /** Get the node name of the workflow */ + public String getWorkflowNodeName() { + if (datum.workflowNodeName != null) { + return datum.workflowNodeName.toString(); + } + return null; + } + /** Get the adjacencies of the workflow */ + public String getWorkflowAdjacencies() { + if (datum.workflowAdjacencies != null) { + return datum.workflowAdjacencies.toString(); + } + return null; + } /** Get the event type */ public EventType getEventType() { return EventType.JOB_SUBMITTED; } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java?rev=1439715&r1=1439714&r2=1439715&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java Tue Jan 29 00:23:27 2013 @@ -46,10 +46,13 @@ public class TaskAttemptStartedEvent imp * @param httpPort The port number of the tracker * @param shufflePort The shuffle port number of the container * @param containerId The containerId for the task attempt. + * @param locality The locality of the task attempt + * @param avataar The avataar of the task attempt */ public TaskAttemptStartedEvent( TaskAttemptID attemptId, TaskType taskType, long startTime, String trackerName, - int httpPort, int shufflePort, ContainerId containerId) { + int httpPort, int shufflePort, ContainerId containerId, + String locality, String avataar) { datum.attemptId = new Utf8(attemptId.toString()); datum.taskid = new Utf8(attemptId.getTaskID().toString()); datum.startTime = startTime; @@ -58,14 +61,21 @@ public class TaskAttemptStartedEvent imp datum.httpPort = httpPort; datum.shufflePort = shufflePort; datum.containerId = new Utf8(containerId.toString()); + if (locality != null) { + datum.locality = new Utf8(locality); + } + if (avataar != null) { + datum.avataar = new Utf8(avataar); + } } // TODO Remove after MrV1 is removed. // Using a dummy containerId to prevent jobHistory parse failures. public TaskAttemptStartedEvent(TaskAttemptID attemptId, TaskType taskType, - long startTime, String trackerName, int httpPort, int shufflePort) { + long startTime, String trackerName, int httpPort, int shufflePort, + String locality, String avataar) { this(attemptId, taskType, startTime, trackerName, httpPort, shufflePort, - ConverterUtils.toContainerId("container_-1_-1_-1_-1")); + ConverterUtils.toContainerId("container_-1_-1_-1_-1"), locality, avataar); } TaskAttemptStartedEvent() {} @@ -105,4 +115,19 @@ public class TaskAttemptStartedEvent imp public ContainerId getContainerId() { return ConverterUtils.toContainerId(datum.containerId.toString()); } + /** Get the locality */ + public String getLocality() { + if (datum.locality != null) { + return datum.locality.toString(); + } + return null; + } + /** Get the avataar */ + public String getAvataar() { + if (datum.avataar != null) { + return datum.avataar.toString(); + } + return null; + } + }