Repository: hadoop Updated Branches: refs/heads/YARN-2928 5d2a81a5a -> 5110ff3bb
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5110ff3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java index 1a96e61..e0c593d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -25,7 +25,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -39,78 +38,100 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identif import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for posting application, appattempt & Container * lifecycle related events to timeline service V2 */ @Private @Unstable -public class TimelineServiceV2Publisher extends - AbstractTimelineServicePublisher { - private static final Log LOG = LogFactory - .getLog(TimelineServiceV2Publisher.class); +public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher { + private static final Log LOG = + LogFactory.getLog(TimelineServiceV2Publisher.class); protected RMTimelineCollectorManager rmTimelineCollectorManager; + private boolean publishContainerMetrics; public TimelineServiceV2Publisher(RMContext rmContext) { super("TimelineserviceV2Publisher"); rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager(); } - private boolean publishContainerMetrics; - @Override - protected void serviceInit(Configuration conf) throws Exception { - publishContainerMetrics = - conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, - YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED); - super.serviceInit(conf); + protected void serviceStart() throws Exception { + super.serviceStart(); + publishContainerMetrics = getConfig().getBoolean( + YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, + YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED); + getDispatcher().register(SystemMetricsEventType.class, + new TimelineV2EventHandler()); } + @VisibleForTesting + boolean isPublishContainerMetrics() { + return publishContainerMetrics; + } + + @SuppressWarnings("unchecked") @Override - void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { - ApplicationEntity entity = - createApplicationEntity(event.getApplicationId()); - entity.setQueue(event.getQueue()); + public void appCreated(RMApp app, long createdTime) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + entity.setQueue(app.getQueue()); + entity.setCreatedTime(createdTime); + Map<String, Object> entityInfo = new HashMap<String, Object>(); - entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, - event.getApplicationName()); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName()); entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, - event.getApplicationType()); - entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, - event.getUser()); + app.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser()); entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, - event.getSubmittedTime()); + app.getSubmitTime()); entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, - event.getAppTags()); + app.getApplicationTags()); entityInfo.put( - ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, - event.isUnmanagedApp()); + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, + app.getApplicationSubmissionContext().getUnmanagedAM()); entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, - event.getApplicationPriority().getPriority()); + app.getApplicationSubmissionContext().getPriority().getPriority()); + entity.getConfigs().put( + ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, + app.getAmNodeLabelExpression()); + entity.getConfigs().put( + ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, + app.getAppNodeLabelExpression()); entity.setInfo(entityInfo); TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(createdTime); entity.addEvent(tEvent); - putEntity(entity, event.getApplicationId()); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); } + @SuppressWarnings("unchecked") @Override - void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { - ApplicationEntity entity = - createApplicationEntity(event.getApplicationId()); - RMAppMetrics appMetrics = event.getAppMetrics(); + public void appFinished(RMApp app, RMAppState state, long finishedTime) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + RMAppMetrics appMetrics = app.getRMAppMetrics(); entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS, appMetrics.getVcoreSeconds()); entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS, @@ -118,54 +139,57 @@ public class TimelineServiceV2Publisher extends TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(finishedTime); Map<String, Object> eventInfo = new HashMap<String, Object>(); eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event - .getFinalApplicationStatus().toString()); - eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event - .getYarnApplicationState().toString()); - if (event.getLatestApplicationAttemptId() != null) { + app.getDiagnostics().toString()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, + app.getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + RMServerUtils.createApplicationState(state).toString()); + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() == null + ? null : app.getCurrentAppAttempt().getAppAttemptId(); + if (appAttemptId != null) { eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, - event.getLatestApplicationAttemptId().toString()); + appAttemptId.toString()); } tEvent.setInfo(eventInfo); entity.addEvent(tEvent); - putEntity(entity, event.getApplicationId()); - //cleaning up the collector cached - event.getApp().stopTimelineCollector(); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); } + @SuppressWarnings("unchecked") @Override - void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { - ApplicationEntity entity = - createApplicationEntity(event.getApplicationId()); + public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, + appViewACLs); + entity.setInfo(entityInfo); + + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void appUpdated(RMApp app, long currentTimeMillis) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); Map<String, Object> eventInfo = new HashMap<String, Object>(); eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); - eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event - .getApplicationPriority().getPriority()); + app.getQueue()); + eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + app.getApplicationSubmissionContext().getPriority().getPriority()); TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(currentTimeMillis); tEvent.setInfo(eventInfo); entity.addEvent(tEvent); - putEntity(entity, event.getApplicationId()); - } - - @Override - void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) { - ApplicationEntity entity = - createApplicationEntity(event.getApplicationId()); - Map<String, Object> entityInfo = new HashMap<String, Object>(); - entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, - event.getViewAppACLs()); - entity.setInfo(entityInfo); - - putEntity(entity, event.getApplicationId()); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); } private static ApplicationEntity createApplicationEntity( @@ -175,104 +199,135 @@ public class TimelineServiceV2Publisher extends return entity; } + @SuppressWarnings("unchecked") @Override - void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { + public void appAttemptRegistered(RMAppAttempt appAttempt, + long registeredTime) { TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); + createAppAttemptEntity(appAttempt.getAppAttemptId()); + entity.setCreatedTime(registeredTime); TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(registeredTime); Map<String, Object> eventInfo = new HashMap<String, Object>(); eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); + appAttempt.getTrackingUrl()); eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); - eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost()); + appAttempt.getOriginalTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, + appAttempt.getHost()); eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, - event.getRpcPort()); - eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event - .getMasterContainerId().toString()); + appAttempt.getRpcPort()); + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + appAttempt.getMasterContainer().getId().toString()); tEvent.setInfo(eventInfo); entity.addEvent(tEvent); - putEntity(entity, event.getApplicationAttemptId().getApplicationId()); + getDispatcher().getEventHandler().handle( + new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, + entity, appAttempt.getAppAttemptId().getApplicationId())); } + @SuppressWarnings("unchecked") @Override - void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { + public void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + ApplicationAttemptEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); + createAppAttemptEntity(appAttempt.getAppAttemptId()); TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(finishedTime); Map<String, Object> eventInfo = new HashMap<String, Object>(); eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); + appAttempt.getTrackingUrl()); eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); + appAttempt.getOriginalTrackingUrl()); eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event - .getFinalApplicationStatus().toString()); - eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event - .getYarnApplicationAttemptState().toString()); + appAttempt.getDiagnostics()); + // app will get the final status from app attempt, or create one + // based on app state if it doesn't exist + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, + app.getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils + .createApplicationAttemptState(appAttemtpState).toString()); tEvent.setInfo(eventInfo); entity.addEvent(tEvent); - putEntity(entity, event.getApplicationAttemptId().getApplicationId()); + getDispatcher().getEventHandler().handle( + new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, + entity, appAttempt.getAppAttemptId().getApplicationId())); } - @Override - void publishContainerCreatedEvent(ContainerCreatedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - // updated as event info instead of entity info, as entity info is updated - // by NM - Map<String, Object> eventInfo = new HashMap<String, Object>(); - eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event - .getAllocatedResource().getMemory()); - eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event - .getAllocatedResource().getVirtualCores()); - eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event - .getAllocatedNode().getHost()); - eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event - .getAllocatedNode().getPort()); - eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, - event.getAllocatedPriority().getPriority()); - eventInfo.put( - ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, - event.getNodeHttpAddress()); - tEvent.setInfo(eventInfo); - - entity.addEvent(tEvent); - putEntity(entity, event.getContainerId().getApplicationAttemptId() - .getApplicationId()); + private static ApplicationAttemptEntity createAppAttemptEntity( + ApplicationAttemptId appAttemptId) { + ApplicationAttemptEntity entity = new ApplicationAttemptEntity(); + entity.setId(appAttemptId.toString()); + entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(), + appAttemptId.getApplicationId().toString())); + return entity; } + @SuppressWarnings("unchecked") @Override - void publishContainerFinishedEvent(ContainerFinishedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map<String, Object> eventInfo = new HashMap<String, Object>(); - eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, - event.getContainerExitStatus()); - eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event - .getContainerState().toString()); - tEvent.setInfo(eventInfo); + public void containerCreated(RMContainer container, long createdTime) { + if (publishContainerMetrics) { + TimelineEntity entity = createContainerEntity(container.getContainerId()); + entity.setCreatedTime(createdTime); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(createdTime); + // updated as event info instead of entity info, as entity info is updated + // by NM + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + container.getAllocatedResource().getMemory()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, + container.getAllocatedResource().getVirtualCores()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + container.getAllocatedNode().getHost()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + container.getAllocatedNode().getPort()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + container.getAllocatedPriority().getPriority()); + eventInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + container.getNodeHttpAddress()); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, container + .getContainerId().getApplicationAttemptId().getApplicationId())); + } + } - entity.addEvent(tEvent); - putEntity(entity, event.getContainerId().getApplicationAttemptId() - .getApplicationId()); + @SuppressWarnings("unchecked") + @Override + public void containerFinished(RMContainer container, long finishedTime) { + if (publishContainerMetrics) { + TimelineEntity entity = createContainerEntity(container.getContainerId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(finishedTime); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + container.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + container.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, + container.getContainerState().toString()); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, container + .getContainerId().getApplicationAttemptId().getApplicationId())); + } } private static ContainerEntity createContainerEntity(ContainerId containerId) { @@ -300,17 +355,48 @@ public class TimelineServiceV2Publisher extends } } - private static ApplicationAttemptEntity createAppAttemptEntity( - ApplicationAttemptId appAttemptId) { - ApplicationAttemptEntity entity = new ApplicationAttemptEntity(); - entity.setId(appAttemptId.toString()); - entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(), - appAttemptId.getApplicationId().toString())); - return entity; + private class ApplicationFinishPublishEvent extends TimelineV2PublishEvent { + private RMAppImpl app; + + public ApplicationFinishPublishEvent(SystemMetricsEventType type, + TimelineEntity entity, RMAppImpl app) { + super(type, entity, app.getApplicationId()); + this.app = app; + } + + public RMAppImpl getRMAppImpl() { + return app; + } } - @Override - public boolean publishRMContainerMetrics() { - return publishContainerMetrics; + private class TimelineV2EventHandler + implements EventHandler<TimelineV2PublishEvent> { + @Override + public void handle(TimelineV2PublishEvent event) { + switch (event.getType()) { + case PUBLISH_APPLICATION_FINISHED_ENTITY: + putEntity(event.getEntity(), event.getApplicationId()); + ((ApplicationFinishPublishEvent) event).getRMAppImpl() + .stopTimelineCollector(); + break; + default: + putEntity(event.getEntity(), event.getApplicationId()); + break; + } + } + } + + private class TimelineV2PublishEvent extends TimelinePublishEvent { + private TimelineEntity entity; + + public TimelineV2PublishEvent(SystemMetricsEventType type, + TimelineEntity entity, ApplicationId appId) { + super(type, appId); + this.entity = entity; + } + + public TimelineEntity getEntity() { + return entity; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5110ff3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java index 0418f37..d6fdb3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java @@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistor import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -68,7 +67,7 @@ import org.junit.Test; public class TestSystemMetricsPublisher { private static ApplicationHistoryServer timelineServer; - private static SystemMetricsPublisher metricsPublisher; + private static TimelineServiceV1Publisher metricsPublisher; private static TimelineStore store; @BeforeClass @@ -89,7 +88,7 @@ public class TestSystemMetricsPublisher { timelineServer.start(); store = timelineServer.getTimelineStore(); - metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class)); + metricsPublisher = new TimelineServiceV1Publisher(); metricsPublisher.init(conf); metricsPublisher.start(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5110ff3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index ac20335..20a5b13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher.MultiThreadedDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -75,7 +74,7 @@ public class TestSystemMetricsPublisherForV2 { TestSystemMetricsPublisherForV2.class.getName() + "-localDir") .getAbsoluteFile(); - private static SystemMetricsPublisher metricsPublisher; + private static TimelineServiceV2Publisher metricsPublisher; private static DrainDispatcher dispatcher = new DrainDispatcher(); private static final String DEFAULT_FLOW_VERSION = "1"; private static final long DEFAULT_FLOW_RUN = 1; @@ -103,10 +102,11 @@ public class TestSystemMetricsPublisherForV2 { rmTimelineCollectorManager.init(conf); rmTimelineCollectorManager.start(); - metricsPublisher = new SystemMetricsPublisher(rmContext) { + dispatcher.init(conf); + dispatcher.start(); + metricsPublisher = new TimelineServiceV2Publisher(rmContext) { @Override - Dispatcher createDispatcher( - TimelineServicePublisher timelineServicePublisher) { + protected Dispatcher getDispatcher() { return dispatcher; } }; @@ -150,8 +150,8 @@ public class TestSystemMetricsPublisherForV2 { @Test public void testSystemMetricPublisherInitialization() { @SuppressWarnings("resource") - SystemMetricsPublisher metricsPublisher = - new SystemMetricsPublisher(mock(RMContext.class)); + TimelineServiceV2Publisher metricsPublisher = + new TimelineServiceV2Publisher(mock(RMContext.class)); try { Configuration conf = getTimelineV2Conf(); conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, @@ -163,20 +163,18 @@ public class TestSystemMetricsPublisherForV2 { metricsPublisher.stop(); - metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class)); + metricsPublisher = new TimelineServiceV2Publisher(mock(RMContext.class)); conf = getTimelineV2Conf(); metricsPublisher.init(conf); + metricsPublisher.start(); assertTrue("Expected to publish container Metrics from RM", metricsPublisher.isPublishContainerMetrics()); - assertTrue( - "MultiThreadedDispatcher expected when container Metrics is not published", - metricsPublisher.getDispatcher() instanceof MultiThreadedDispatcher); } finally { metricsPublisher.stop(); } } - @Test(timeout = 1000000) + @Test(timeout = 10000) public void testPublishApplicationMetrics() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 1); RMApp app = createAppAndRegister(appId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5110ff3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 7637410..a642a78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;