MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history events and counters. Contributed by Junping Du.
(cherry picked from commit 5eeb2b156f8e108205945f0a1d06873cb51c3527) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e58f943 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e58f943 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e58f943 Branch: refs/heads/YARN-2928 Commit: 8e58f943ffbb9b6c02422984ee2c77ae45948c9e Parents: 8fd1aa1 Author: Zhijie Shen <zjs...@apache.org> Authored: Tue Apr 21 16:31:33 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Tue Aug 25 10:47:10 2015 -0700 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 15 ++ .../jobhistory/JobHistoryEventHandler.java | 258 ++++++++++++++++--- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 23 ++ .../v2/app/rm/RMContainerAllocator.java | 9 + .../hadoop/mapreduce/jobhistory/TestEvents.java | 8 +- .../jobhistory/TestJobHistoryEventHandler.java | 9 +- .../apache/hadoop/mapreduce/MRJobConfig.java | 5 + .../mapreduce/jobhistory/AMStartedEvent.java | 18 ++ .../mapreduce/jobhistory/HistoryEvent.java | 4 + .../mapreduce/jobhistory/JobFinishedEvent.java | 25 ++ .../jobhistory/JobInfoChangeEvent.java | 11 + .../mapreduce/jobhistory/JobInitedEvent.java | 14 + .../jobhistory/JobPriorityChangeEvent.java | 10 + .../jobhistory/JobQueueChangeEvent.java | 10 + .../jobhistory/JobStatusChangedEvent.java | 10 + .../mapreduce/jobhistory/JobSubmittedEvent.java | 23 ++ .../JobUnsuccessfulCompletionEvent.java | 16 ++ .../jobhistory/MapAttemptFinishedEvent.java | 24 +- .../jobhistory/NormalizedResourceEvent.java | 11 + .../jobhistory/ReduceAttemptFinishedEvent.java | 25 +- .../jobhistory/TaskAttemptFinishedEvent.java | 19 ++ .../jobhistory/TaskAttemptStartedEvent.java | 18 ++ .../TaskAttemptUnsuccessfulCompletionEvent.java | 24 ++ .../mapreduce/jobhistory/TaskFailedEvent.java | 19 ++ .../mapreduce/jobhistory/TaskFinishedEvent.java | 18 ++ .../mapreduce/jobhistory/TaskStartedEvent.java | 12 + .../mapreduce/jobhistory/TaskUpdatedEvent.java | 10 + .../mapreduce/util/JobHistoryEventUtils.java | 51 ++++ .../src/main/resources/mapred-default.xml | 7 + .../mapred/TestMRTimelineEventHandling.java | 163 +++++++++++- .../hadoop/mapreduce/v2/MiniMRYarnCluster.java | 21 +- 31 files changed, 838 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 305b29e..5ac0d3b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1,5 +1,20 @@ Hadoop MapReduce Change Log +Branch YARN-2928: Timeline Server Next Generation: Phase 1 + + INCOMPATIBLE CHANGES + + NEW FEATURES + + MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history + events and counters. (Junping Du via zjshen) + + IMPROVEMENTS + + OPTIMIZATIONS + + BUG FIXES + Trunk (Unreleased) INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index b0bcfcd..1e84e44 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -19,6 +19,9 @@ package org.apache.hadoop.mapreduce.jobhistory; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -49,11 +52,13 @@ import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; @@ -68,10 +73,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.node.ArrayNode; -import org.codehaus.jackson.node.ObjectNode; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.annotations.VisibleForTesting; /** @@ -119,14 +122,24 @@ public class JobHistoryEventHandler extends AbstractService protected static final Map<JobId, MetaInfo> fileMap = Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>()); + + // For posting entities in new timeline service in a non-blocking way + // TODO YARN-3367 replace with event loop in TimelineClient. + private static ExecutorService threadPool = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); // should job completion be force when the AM shuts down? protected volatile boolean forceJobCompletion = false; protected TimelineClient timelineClient; + + private boolean newTimelineServiceEnabled = false; private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB"; private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK"; + private static String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = "MAPREDUCE_TASK_ATTEMPT"; public JobHistoryEventHandler(AppContext context, int startCount) { super("JobHistoryEventHandler"); @@ -246,13 +259,22 @@ public class JobHistoryEventHandler extends AbstractService MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD); + // TODO replace MR specific configurations on timeline service with getting + // configuration from RM through registerApplicationMaster() in + // ApplicationMasterProtocol with return value for timeline service + // configuration status: off, on_with_v1 or on_with_v2. if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) { if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - timelineClient = TimelineClient.createTimelineClient(); + + timelineClient = + ((MRAppMaster.RunningAppContext)context).getTimelineClient(); timelineClient.init(conf); - LOG.info("Timeline service is enabled"); + newTimelineServiceEnabled = conf.getBoolean( + MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, + MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED); + LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1")); LOG.info("Emitting job history data to the timeline server is enabled"); } else { LOG.info("Timeline service is not enabled"); @@ -426,9 +448,26 @@ public class JobHistoryEventHandler extends AbstractService if (timelineClient != null) { timelineClient.stop(); } + shutdownAndAwaitTermination(); LOG.info("Stopped JobHistoryEventHandler. super.stop()"); super.serviceStop(); } + + // TODO remove threadPool after adding non-blocking call in TimelineClient + private static void shutdownAndAwaitTermination() { + threadPool.shutdown(); + try { + if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); + if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) + LOG.error("ThreadPool did not terminate"); + } + } catch (InterruptedException ie) { + threadPool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } protected EventWriter createEventWriter(Path historyFilePath) throws IOException { @@ -583,8 +622,13 @@ public class JobHistoryEventHandler extends AbstractService processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); if (timelineClient != null) { - processEventForTimelineServer(historyEvent, event.getJobID(), - event.getTimestamp()); + if (newTimelineServiceEnabled) { + processEventForNewTimelineService(historyEvent, event.getJobID(), + event.getTimestamp()); + } else { + processEventForTimelineServer(historyEvent, event.getJobID(), + event.getTimestamp()); + } } if (LOG.isDebugEnabled()) { LOG.debug("In HistoryEventHandler " @@ -825,11 +869,11 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps()); tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces()); tEvent.addEventInfo("MAP_COUNTERS_GROUPS", - countersToJSON(jfe.getTotalCounters())); + JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters())); tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS", - countersToJSON(jfe.getReduceCounters())); + JobHistoryEventUtils.countersToJSON(jfe.getReduceCounters())); tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS", - countersToJSON(jfe.getTotalCounters())); + JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters())); tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString()); tEntity.addEvent(tEvent); tEntity.setEntityId(jobId.toString()); @@ -855,7 +899,7 @@ public class JobHistoryEventHandler extends AbstractService tfe.getFailedAttemptID() == null ? "" : tfe.getFailedAttemptID().toString()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tfe.getCounters())); + JobHistoryEventUtils.countersToJSON(tfe.getCounters())); tEntity.addEvent(tEvent); tEntity.setEntityId(tfe.getTaskId().toString()); tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); @@ -873,7 +917,7 @@ public class JobHistoryEventHandler extends AbstractService TaskFinishedEvent tfe2 = (TaskFinishedEvent) event; tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tfe2.getCounters())); + JobHistoryEventUtils.countersToJSON(tfe2.getCounters())); tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime()); tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString()); tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID", @@ -895,7 +939,6 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("START_TIME", tase.getStartTime()); tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort()); tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName()); - tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString()); tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort()); tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ? "" : tase.getContainerId().toString()); @@ -928,7 +971,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime()); tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tauce.getCounters())); + JobHistoryEventUtils.countersToJSON(tauce.getCounters())); tEntity.addEvent(tEvent); tEntity.setEntityId(tauce.getTaskId().toString()); tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); @@ -942,7 +985,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("STATE", mafe.getState()); tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(mafe.getCounters())); + JobHistoryEventUtils.countersToJSON(mafe.getCounters())); tEvent.addEventInfo("HOSTNAME", mafe.getHostname()); tEvent.addEventInfo("PORT", mafe.getPort()); tEvent.addEventInfo("RACK_NAME", mafe.getRackName()); @@ -964,7 +1007,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime()); tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(rafe.getCounters())); + JobHistoryEventUtils.countersToJSON(rafe.getCounters())); tEvent.addEventInfo("HOSTNAME", rafe.getHostname()); tEvent.addEventInfo("PORT", rafe.getPort()); tEvent.addEventInfo("RACK_NAME", rafe.getRackName()); @@ -983,7 +1026,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("STATUS", tafe.getTaskStatus()); tEvent.addEventInfo("STATE", tafe.getState()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tafe.getCounters())); + JobHistoryEventUtils.countersToJSON(tafe.getCounters())); tEvent.addEventInfo("HOSTNAME", tafe.getHostname()); tEntity.addEvent(tEvent); tEntity.setEntityId(tafe.getTaskId().toString()); @@ -1010,37 +1053,172 @@ public class JobHistoryEventHandler extends AbstractService default: break; } - + try { timelineClient.putEntities(tEntity); - } catch (IOException ex) { - LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline" - + "Server", ex); - } catch (YarnException ex) { + } catch (IOException|YarnException ex) { LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline" + "Server", ex); } } - - @Private - public JsonNode countersToJSON(Counters counters) { - ObjectMapper mapper = new ObjectMapper(); - ArrayNode nodes = mapper.createArrayNode(); - if (counters != null) { - for (CounterGroup counterGroup : counters) { - ObjectNode groupNode = nodes.addObject(); - groupNode.put("NAME", counterGroup.getName()); - groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName()); - ArrayNode countersNode = groupNode.putArray("COUNTERS"); - for (Counter counter : counterGroup) { - ObjectNode counterNode = countersNode.addObject(); - counterNode.put("NAME", counter.getName()); - counterNode.put("DISPLAY_NAME", counter.getDisplayName()); - counterNode.put("VALUE", counter.getValue()); + + private void putEntityWithoutBlocking(final TimelineClient timelineClient, + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) { + Runnable publishWrapper = new Runnable() { + public void run() { + try { + timelineClient.putEntities(entity); + } catch (IOException|YarnException e) { + LOG.error("putEntityNonBlocking get failed: " + e); + throw new RuntimeException(e.toString()); } } + }; + threadPool.execute(publishWrapper); + } + + // create JobEntity from HistoryEvent with adding other info, like: + // jobId, timestamp and entityType. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createJobEntity(HistoryEvent event, long timestamp, JobId jobId, + String entityType) { + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType); + entity.setId(jobId.toString()); + return entity; + } + + // create BaseEntity from HistoryEvent with adding other info, like: + // timestamp and entityType. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createBaseEntity(HistoryEvent event, long timestamp, String entityType) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent = + event.toTimelineEvent(); + tEvent.setTimestamp(timestamp); + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); + entity.addEvent(tEvent); + entity.setType(entityType); + return entity; + } + + // create TaskEntity from HistoryEvent with adding other info, like: + // taskId, jobId, timestamp, entityType and relatedJobEntity. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createTaskEntity(HistoryEvent event, long timestamp, String taskId, + String entityType, String relatedJobEntity, JobId jobId) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType); + entity.setId(taskId); + entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString()); + return entity; + } + + // create TaskAttemptEntity from HistoryEvent with adding other info, like: + // timestamp, taskAttemptId, entityType, relatedTaskEntity and taskId. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createTaskAttemptEntity(HistoryEvent event, long timestamp, + String taskAttemptId, String entityType, String relatedTaskEntity, + String taskId) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType); + entity.setId(taskAttemptId); + entity.addIsRelatedToEntity(relatedTaskEntity, taskId); + return entity; + } + + private void processEventForNewTimelineService(HistoryEvent event, JobId jobId, + long timestamp) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity = null; + String taskId = null; + String taskAttemptId = null; + + switch (event.getEventType()) { + // Handle job events + case JOB_SUBMITTED: + case JOB_STATUS_CHANGED: + case JOB_INFO_CHANGED: + case JOB_INITED: + case JOB_PRIORITY_CHANGED: + case JOB_QUEUE_CHANGED: + case JOB_FAILED: + case JOB_KILLED: + case JOB_ERROR: + case JOB_FINISHED: + case AM_STARTED: + case NORMALIZED_RESOURCE: + break; + // Handle task events + case TASK_STARTED: + taskId = ((TaskStartedEvent)event).getTaskId().toString(); + break; + case TASK_FAILED: + taskId = ((TaskFailedEvent)event).getTaskId().toString(); + break; + case TASK_UPDATED: + taskId = ((TaskUpdatedEvent)event).getTaskId().toString(); + break; + case TASK_FINISHED: + taskId = ((TaskFinishedEvent)event).getTaskId().toString(); + break; + case MAP_ATTEMPT_STARTED: + case CLEANUP_ATTEMPT_STARTED: + case REDUCE_ATTEMPT_STARTED: + case SETUP_ATTEMPT_STARTED: + taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString(); + taskAttemptId = ((TaskAttemptStartedEvent)event). + getTaskAttemptId().toString(); + break; + case MAP_ATTEMPT_FAILED: + case CLEANUP_ATTEMPT_FAILED: + case REDUCE_ATTEMPT_FAILED: + case SETUP_ATTEMPT_FAILED: + case MAP_ATTEMPT_KILLED: + case CLEANUP_ATTEMPT_KILLED: + case REDUCE_ATTEMPT_KILLED: + case SETUP_ATTEMPT_KILLED: + taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event).getTaskId().toString(); + taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event). + getTaskAttemptId().toString(); + break; + case MAP_ATTEMPT_FINISHED: + taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString(); + taskAttemptId = ((MapAttemptFinishedEvent)event).getAttemptId().toString(); + break; + case REDUCE_ATTEMPT_FINISHED: + taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString(); + taskAttemptId = ((ReduceAttemptFinishedEvent)event).getAttemptId().toString(); + break; + case SETUP_ATTEMPT_FINISHED: + case CLEANUP_ATTEMPT_FINISHED: + taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString(); + taskAttemptId = ((TaskAttemptFinishedEvent)event).getAttemptId().toString(); + break; + default: + LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" + + " and handled by timeline service."); + return; } - return nodes; + if (taskId == null) { + // JobEntity + tEntity = createJobEntity(event, timestamp, jobId, + MAPREDUCE_JOB_ENTITY_TYPE); + } else { + if (taskAttemptId == null) { + // TaskEntity + tEntity = createTaskEntity(event, timestamp, taskId, + MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, jobId); + } else { + // TaskAttemptEntity + tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId, + MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE, + taskId); + } + } + + putEntityWithoutBlocking(timelineClient, tEntity); } private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 6dc830f..dafb6e9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -135,6 +135,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -1004,6 +1005,7 @@ public class MRAppMaster extends CompositeService { private final Configuration conf; private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; + private TimelineClient timelineClient = null; private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; @@ -1013,6 +1015,23 @@ public class MRAppMaster extends CompositeService { this.clientToAMTokenSecretManager = new ClientToAMTokenSecretManager(appAttemptID, null); this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor; + if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, + MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA) + && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + + boolean newTimelineServiceEnabled = conf.getBoolean( + MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, + MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED); + + if (newTimelineServiceEnabled) { + // create new version TimelineClient + timelineClient = TimelineClient.createTimelineClient( + appAttemptID.getApplicationId()); + } else { + timelineClient = TimelineClient.createTimelineClient(); + } + } } @Override @@ -1103,6 +1122,10 @@ public class MRAppMaster extends CompositeService { return taskAttemptFinishingMonitor; } + // Get Timeline Collector's address (get sync from RM) + public TimelineClient getTimelineClient() { + return timelineClient; + } } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index ac4c586..e5338b5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; @@ -769,6 +770,14 @@ public class RMContainerAllocator extends RMContainerRequestor computeIgnoreBlacklisting(); handleUpdatedNodes(response); + String collectorAddr = response.getCollectorAddr(); + MRAppMaster.RunningAppContext appContext = + (MRAppMaster.RunningAppContext)this.getContext(); + if (collectorAddr != null && !collectorAddr.isEmpty() + && appContext.getTimelineClient() != null) { + appContext.getTimelineClient().setTimelineServiceAddress( + response.getCollectorAddr()); + } for (ContainerStatus cont : finishedContainers) { LOG.info("Received completed container " + cont.getContainerId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java index 7612ceb..e7d5006 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.junit.Test; public class TestEvents { @@ -404,7 +405,12 @@ public class TestEvents { public void setDatum(Object datum) { this.datum = datum; } - + + @Override + public TimelineEvent toTimelineEvent() { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 2b07efb..2c159f1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -52,6 +52,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; @@ -666,7 +667,7 @@ public class TestJobHistoryEventHandler { group2.addCounter("MARTHA_JONES", "Martha Jones", 3); group2.addCounter("DONNA_NOBLE", "Donna Noble", 2); group2.addCounter("ROSE_TYLER", "Rose Tyler", 1); - JsonNode jsonNode = jheh.countersToJSON(counters); + JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters); String jsonStr = new ObjectMapper().writeValueAsString(jsonNode); String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions " + "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\"" @@ -689,19 +690,19 @@ public class TestJobHistoryEventHandler { public void testCountersToJSONEmpty() throws Exception { JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0); Counters counters = null; - JsonNode jsonNode = jheh.countersToJSON(counters); + JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters); String jsonStr = new ObjectMapper().writeValueAsString(jsonNode); String expected = "[]"; Assert.assertEquals(expected, jsonStr); counters = new Counters(); - jsonNode = jheh.countersToJSON(counters); + jsonNode = JobHistoryEventUtils.countersToJSON(counters); jsonStr = new ObjectMapper().writeValueAsString(jsonNode); expected = "[]"; Assert.assertEquals(expected, jsonStr); counters.addGroup("DOCTORS", "Incarnations of the Doctor"); - jsonNode = jheh.countersToJSON(counters); + jsonNode = JobHistoryEventUtils.countersToJSON(counters); jsonStr = new ObjectMapper().writeValueAsString(jsonNode); expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the " + "Doctor\",\"COUNTERS\":[]}]"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 59b887d..9c1b835 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -462,6 +462,11 @@ public interface MRJobConfig { "mapreduce.job.emit-timeline-data"; public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA = false; + + public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED = + "mapreduce.job.new-timeline-service.enabled"; + public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED = + false; public static final String MR_PREFIX = "yarn.app.mapreduce."; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java index ea2ca9e..bbc7090 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java @@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.avro.util.Utf8; @@ -166,4 +168,20 @@ public class AMStartedEvent implements HistoryEvent { public EventType getEventType() { return EventType.AM_STARTED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("APPLICATION_ATTEMPT_ID", + getAppAttemptId() == null ? "" : getAppAttemptId().toString()); + tEvent.addInfo("CONTAINER_ID", getContainerId() == null ? + "" : getContainerId().toString()); + tEvent.addInfo("NODE_MANAGER_HOST", getNodeManagerHost()); + tEvent.addInfo("NODE_MANAGER_PORT", getNodeManagerPort()); + tEvent.addInfo("NODE_MANAGER_HTTP_PORT", getNodeManagerHttpPort()); + tEvent.addInfo("START_TIME", getStartTime()); + return tEvent; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java index a30748c..61ce217 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Interface for event wrapper classes. Implementations each wrap an @@ -37,4 +38,7 @@ public interface HistoryEvent { /** Set the Avro datum wrapped by this. */ void setDatum(Object datum); + + /** Map HistoryEvent to TimelineEvent */ + TimelineEvent toTimelineEvent(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java index 0fa5868..80d3ee6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java @@ -23,6 +23,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record successful completion of job @@ -133,4 +136,26 @@ public class JobFinishedEvent implements HistoryEvent { public Counters getReduceCounters() { return reduceCounters; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("NUM_MAPS", getFinishedMaps()); + tEvent.addInfo("NUM_REDUCES", getFinishedReduces()); + tEvent.addInfo("FAILED_MAPS", getFailedMaps()); + tEvent.addInfo("FAILED_REDUCES", getFailedReduces()); + tEvent.addInfo("FINISHED_MAPS", getFinishedMaps()); + tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces()); + tEvent.addInfo("MAP_COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getMapCounters())); + tEvent.addInfo("REDUCE_COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getReduceCounters())); + tEvent.addInfo("TOTAL_COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getTotalCounters())); + // TODO replace SUCCEEDED with JobState.SUCCEEDED.toString() + tEvent.addInfo("JOB_STATUS", "SUCCEEDED"); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java index b45f373..ad82443 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -64,5 +66,14 @@ public class JobInfoChangeEvent implements HistoryEvent { public EventType getEventType() { return EventType.JOB_INFO_CHANGED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("SUBMIT_TIME", getSubmitTime()); + tEvent.addInfo("LAUNCH_TIME", getLaunchTime()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java index 5abb40e..3e0f2f7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java @@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -73,4 +75,16 @@ public class JobInitedEvent implements HistoryEvent { } /** Get whether the job's map and reduce stages were combined */ public boolean getUberized() { return datum.getUberized(); } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("START_TIME", getLaunchTime()); + tEvent.addInfo("STATUS", getStatus()); + tEvent.addInfo("TOTAL_MAPS", getTotalMaps()); + tEvent.addInfo("TOTAL_REDUCES", getTotalReduces()); + tEvent.addInfo("UBERIZED", getUberized()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java index 6c32462..5deea0a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -64,5 +66,13 @@ public class JobPriorityChangeEvent implements HistoryEvent { public EventType getEventType() { return EventType.JOB_PRIORITY_CHANGED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("PRIORITY", getPriority().toString()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java index 86078e6..b9dd359 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java @@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.avro.util.Utf8; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; @SuppressWarnings("deprecation") public class JobQueueChangeEvent implements HistoryEvent { @@ -59,5 +61,13 @@ public class JobQueueChangeEvent implements HistoryEvent { } return null; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("QUEUE_NAMES", getJobQueueName()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java index 9e11a55..a4f2da2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -60,5 +62,13 @@ public class JobStatusChangedEvent implements HistoryEvent { public EventType getEventType() { return EventType.JOB_STATUS_CHANGED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("STATUS", getStatus()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java index 845f6f7..47b2840 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java @@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -205,5 +207,26 @@ public class JobSubmittedEvent implements HistoryEvent { } /** Get the event type */ public EventType getEventType() { return EventType.JOB_SUBMITTED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("SUBMIT_TIME", getSubmitTime()); + tEvent.addInfo("QUEUE_NAME", getJobQueueName()); + tEvent.addInfo("JOB_NAME", getJobName()); + tEvent.addInfo("USER_NAME", getUserName()); + tEvent.addInfo("JOB_CONF_PATH", getJobConfPath()); + tEvent.addInfo("ACLS", getJobAcls()); + tEvent.addInfo("JOB_QUEUE_NAME", getJobQueueName()); + tEvent.addInfo("WORKLFOW_ID", getWorkflowId()); + tEvent.addInfo("WORKFLOW_NAME", getWorkflowName()); + tEvent.addInfo("WORKFLOW_NODE_NAME", getWorkflowNodeName()); + tEvent.addInfo("WORKFLOW_ADJACENCIES", + getWorkflowAdjacencies()); + tEvent.addInfo("WORKFLOW_TAGS", getWorkflowTags()); + + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java index 1b7773d..ea9798c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java @@ -24,6 +24,8 @@ import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import java.util.Collections; @@ -119,4 +121,18 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent { final CharSequence diagnostics = datum.getDiagnostics(); return diagnostics == null ? NODIAGS : diagnostics.toString(); } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("NUM_MAPS", getFinishedMaps()); + tEvent.addInfo("NUM_REDUCES", getFinishedReduces()); + tEvent.addInfo("JOB_STATUS", getStatus()); + tEvent.addInfo("DIAGNOSTICS", getDiagnostics()); + tEvent.addInfo("FINISHED_MAPS", getFinishedMaps()); + tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java index bc046c7..36737e9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java @@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record successful completion of a map attempt @@ -33,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskType; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class MapAttemptFinishedEvent implements HistoryEvent { +public class MapAttemptFinishedEvent implements HistoryEvent { private MapAttemptFinished datum = null; @@ -218,4 +221,23 @@ public class MapAttemptFinishedEvent implements HistoryEvent { return physMemKbytes; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("STATE", getState()); + tEvent.addInfo("MAP_FINISH_TIME", getMapFinishTime()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + tEvent.addInfo("HOSTNAME", getHostname()); + tEvent.addInfo("PORT", getPort()); + tEvent.addInfo("RACK_NAME", getRackName()); + tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ? + "" : getAttemptId().toString()); + return tEvent; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java index b8f049c..daa454c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java @@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record the normalized map/reduce requirements. @@ -71,4 +73,13 @@ public class NormalizedResourceEvent implements HistoryEvent { public void setDatum(Object datum) { throw new UnsupportedOperationException("Not a seriable object"); } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("MEMORY", "" + getMemory()); + tEvent.addInfo("TASK_TYPE", getTaskType()); + return tEvent; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java index 6644a48..6087c7a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java @@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record successful completion of a reduce attempt @@ -33,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskType; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class ReduceAttemptFinishedEvent implements HistoryEvent { +public class ReduceAttemptFinishedEvent implements HistoryEvent { private ReduceAttemptFinished datum = null; @@ -222,5 +225,25 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { public int[] getPhysMemKbytes() { return physMemKbytes; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ? + "" : getAttemptId().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("STATE", getState()); + tEvent.addInfo("SHUFFLE_FINISH_TIME", getShuffleFinishTime()); + tEvent.addInfo("SORT_FINISH_TIME", getSortFinishTime()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + tEvent.addInfo("HOSTNAME", getHostname()); + tEvent.addInfo("PORT", getPort()); + tEvent.addInfo("RACK_NAME", getRackName()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java index bb7dbe0..c7c4387 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java @@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record successful task completion @@ -135,5 +138,21 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { ? EventType.MAP_ATTEMPT_FINISHED : EventType.REDUCE_ATTEMPT_FINISHED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ? + "" : getAttemptId().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("STATE", getState()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + tEvent.addInfo("HOSTNAME", getHostname()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java index c8c250a..d8baec4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java @@ -23,8 +23,10 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -132,5 +134,21 @@ public class TaskAttemptStartedEvent implements HistoryEvent { } return null; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("TASK_ATTEMPT_ID", + getTaskAttemptId().toString()); + tEvent.addInfo("START_TIME", getStartTime()); + tEvent.addInfo("HTTP_PORT", getHttpPort()); + tEvent.addInfo("TRACKER_NAME", getTrackerName()); + tEvent.addInfo("SHUFFLE_PORT", getShufflePort()); + tEvent.addInfo("CONTAINER_ID", getContainerId() == null ? + "" : getContainerId().toString()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java index 77ee2a0..0bb1358 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java @@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.mapred.ProgressSplitsBlock; @@ -247,5 +250,26 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { public int[] getPhysMemKbytes() { return physMemKbytes; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("TASK_ATTEMPT_ID", getTaskAttemptId() == null ? + "" : getTaskAttemptId().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("ERROR", getError()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("HOSTNAME", getHostname()); + tEvent.addInfo("PORT", getPort()); + tEvent.addInfo("RACK_NAME", getRackName()); + tEvent.addInfo("SHUFFLE_FINISH_TIME", getFinishTime()); + tEvent.addInfo("SORT_FINISH_TIME", getFinishTime()); + tEvent.addInfo("MAP_FINISH_TIME", getFinishTime()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java index 2838f08..5e82dea 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java @@ -20,10 +20,14 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -136,5 +140,20 @@ public class TaskFailedEvent implements HistoryEvent { public EventType getEventType() { return EventType.TASK_FAILED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("STATUS", TaskStatus.State.FAILED.toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("ERROR", getError()); + tEvent.addInfo("FAILED_ATTEMPT_ID", + getFailedAttemptID() == null ? "" : getFailedAttemptID().toString()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java index d4ec74d..e359e32 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java @@ -21,10 +21,14 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record the successful completion of a task @@ -115,5 +119,19 @@ public class TaskFinishedEvent implements HistoryEvent { return EventType.TASK_FINISHED; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", TaskStatus.State.SUCCEEDED.toString()); + tEvent.addInfo("SUCCESSFUL_TASK_ATTEMPT_ID", + getSuccessfulTaskAttemptId() == null ? "" : + getSuccessfulTaskAttemptId().toString()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java index ed53b03..d1b97bf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java @@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record the start of a task @@ -71,5 +73,15 @@ public class TaskStartedEvent implements HistoryEvent { public EventType getEventType() { return EventType.TASK_STARTED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("START_TIME", getStartTime()); + tEvent.addInfo("SPLIT_LOCATIONS", getSplitLocations()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java index 58f4143..b9a389c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -60,5 +62,13 @@ public class TaskUpdatedEvent implements HistoryEvent { public EventType getEventType() { return EventType.TASK_UPDATED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + return tEvent; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java new file mode 100644 index 0000000..f4896ff --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java @@ -0,0 +1,51 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapreduce.util; + +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; + +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.ObjectNode; + +public class JobHistoryEventUtils { + + public static JsonNode countersToJSON(Counters counters) { + ObjectMapper mapper = new ObjectMapper(); + ArrayNode nodes = mapper.createArrayNode(); + if (counters != null) { + for (CounterGroup counterGroup : counters) { + ObjectNode groupNode = nodes.addObject(); + groupNode.put("NAME", counterGroup.getName()); + groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName()); + ArrayNode countersNode = groupNode.putArray("COUNTERS"); + for (Counter counter : counterGroup) { + ObjectNode counterNode = countersNode.addObject(); + counterNode.put("NAME", counter.getName()); + counterNode.put("DISPLAY_NAME", counter.getDisplayName()); + counterNode.put("VALUE", counter.getValue()); + } + } + } + return nodes; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 6d205c5..c7f61cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -607,6 +607,13 @@ </description> </property> + <property> + <name>mapreduce.job.new-timeline-service.enabled</name> + <value>false</value> + <description>Specifies if posting job and task events to new timeline service. + </description> +</property> + <property> <name>mapreduce.input.fileinputformat.split.minsize</name> <value>0</value>