Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 5d2a81a5a -> 5110ff3bb


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5110ff3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
index 1a96e61..e0c593d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
@@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -39,78 +38,100 @@ import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identif
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for posting application, appattempt & Container
  * lifecycle related events to timeline service V2
  */
 @Private
 @Unstable
-public class TimelineServiceV2Publisher extends
-    AbstractTimelineServicePublisher {
-  private static final Log LOG = LogFactory
-      .getLog(TimelineServiceV2Publisher.class);
+public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher 
{
+  private static final Log LOG =
+      LogFactory.getLog(TimelineServiceV2Publisher.class);
   protected RMTimelineCollectorManager rmTimelineCollectorManager;
+  private boolean publishContainerMetrics;
 
   public TimelineServiceV2Publisher(RMContext rmContext) {
     super("TimelineserviceV2Publisher");
     rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
   }
 
-  private boolean publishContainerMetrics;
-
   @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    publishContainerMetrics =
-        conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
-            YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
-    super.serviceInit(conf);
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    publishContainerMetrics = getConfig().getBoolean(
+        YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
+        YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
+    getDispatcher().register(SystemMetricsEventType.class,
+        new TimelineV2EventHandler());
   }
 
+  @VisibleForTesting
+  boolean isPublishContainerMetrics() {
+    return publishContainerMetrics;
+  }
+
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
-    ApplicationEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    entity.setQueue(event.getQueue());
+  public void appCreated(RMApp app, long createdTime) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
+    entity.setQueue(app.getQueue());
+    entity.setCreatedTime(createdTime);
+
     Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
-        event.getApplicationName());
+    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, 
app.getName());
     entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
-        event.getApplicationType());
-    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
-        event.getUser());
+        app.getApplicationType());
+    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, 
app.getUser());
     entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
-        event.getSubmittedTime());
+        app.getSubmitTime());
     entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
-        event.getAppTags());
+        app.getApplicationTags());
     entityInfo.put(
-      ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
-      event.isUnmanagedApp());
+        ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
+        app.getApplicationSubmissionContext().getUnmanagedAM());
     entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
-      event.getApplicationPriority().getPriority());
+        app.getApplicationSubmissionContext().getPriority().getPriority());
+    entity.getConfigs().put(
+        ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
+        app.getAmNodeLabelExpression());
+    entity.getConfigs().put(
+        ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
+        app.getAppNodeLabelExpression());
     entity.setInfo(entityInfo);
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(createdTime);
     entity.addEvent(tEvent);
 
-    putEntity(entity, event.getApplicationId());
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, 
app.getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
-    ApplicationEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    RMAppMetrics appMetrics = event.getAppMetrics();
+  public void appFinished(RMApp app, RMAppState state, long finishedTime) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
+    RMAppMetrics appMetrics = app.getRMAppMetrics();
     entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
         appMetrics.getVcoreSeconds());
     entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
@@ -118,54 +139,57 @@ public class TimelineServiceV2Publisher extends
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationState().toString());
-    if (event.getLatestApplicationAttemptId() != null) {
+        app.getDiagnostics().toString());
+    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
+        RMServerUtils.createApplicationState(state).toString());
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() == null
+        ? null : app.getCurrentAppAttempt().getAppAttemptId();
+    if (appAttemptId != null) {
       eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
-          event.getLatestApplicationAttemptId().toString());
+          appAttemptId.toString());
     }
     tEvent.setInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationId());
 
-    //cleaning up the collector cached
-    event.getApp().stopTimelineCollector();
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, 
app.getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
-    ApplicationEntity entity =
-        createApplicationEntity(event.getApplicationId());
+  public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
+        appViewACLs);
+    entity.setInfo(entityInfo);
+
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, 
app.getApplicationId()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void appUpdated(RMApp app, long currentTimeMillis) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
-    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
-        .getApplicationPriority().getPriority());
+        app.getQueue());
+    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
+        app.getApplicationSubmissionContext().getPriority().getPriority());
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(currentTimeMillis);
     tEvent.setInfo(eventInfo);
     entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationId());
-  }
-
-  @Override
-  void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
-    ApplicationEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
-        event.getViewAppACLs());
-    entity.setInfo(entityInfo);
-
-    putEntity(entity, event.getApplicationId());
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, 
app.getApplicationId()));
   }
 
   private static ApplicationEntity createApplicationEntity(
@@ -175,104 +199,135 @@ public class TimelineServiceV2Publisher extends
     return entity;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
+  public void appAttemptRegistered(RMAppAttempt appAttempt,
+      long registeredTime) {
     TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
+    entity.setCreatedTime(registeredTime);
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(registeredTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
+        appAttempt.getTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
-    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost());
+        appAttempt.getOriginalTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
+        appAttempt.getHost());
     eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
-        event.getRpcPort());
-    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event
-        .getMasterContainerId().toString());
+        appAttempt.getRpcPort());
+    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
+        appAttempt.getMasterContainer().getId().toString());
     tEvent.setInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+    getDispatcher().getEventHandler().handle(
+        new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
+  public void appAttemptFinished(RMAppAttempt appAttempt,
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+
     ApplicationAttemptEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
+        appAttempt.getTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
+        appAttempt.getOriginalTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationAttemptState().toString());
+        appAttempt.getDiagnostics());
+    // app will get the final status from app attempt, or create one
+    // based on app state if it doesn't exist
+    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils
+        .createApplicationAttemptState(appAttemtpState).toString());
     tEvent.setInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+    getDispatcher().getEventHandler().handle(
+        new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
   }
 
-  @Override
-  void publishContainerCreatedEvent(ContainerCreatedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    // updated as event info instead of entity info, as entity info is updated
-    // by NM
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event
-        .getAllocatedResource().getMemory());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
-        .getAllocatedResource().getVirtualCores());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
-        .getAllocatedNode().getHost());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
-        .getAllocatedNode().getPort());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
-        event.getAllocatedPriority().getPriority());
-    eventInfo.put(
-        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
-        event.getNodeHttpAddress());
-    tEvent.setInfo(eventInfo);
-
-    entity.addEvent(tEvent);
-    putEntity(entity, event.getContainerId().getApplicationAttemptId()
-        .getApplicationId());
+  private static ApplicationAttemptEntity createAppAttemptEntity(
+      ApplicationAttemptId appAttemptId) {
+    ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
+    entity.setId(appAttemptId.toString());
+    entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
+        appAttemptId.getApplicationId().toString()));
+    return entity;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishContainerFinishedEvent(ContainerFinishedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
-        event.getContainerExitStatus());
-    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
-        .getContainerState().toString());
-    tEvent.setInfo(eventInfo);
+  public void containerCreated(RMContainer container, long createdTime) {
+    if (publishContainerMetrics) {
+      TimelineEntity entity = 
createContainerEntity(container.getContainerId());
+      entity.setCreatedTime(createdTime);
+
+      TimelineEvent tEvent = new TimelineEvent();
+      tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
+      tEvent.setTimestamp(createdTime);
+      // updated as event info instead of entity info, as entity info is 
updated
+      // by NM
+      Map<String, Object> eventInfo = new HashMap<String, Object>();
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
+          container.getAllocatedResource().getMemory());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
+          container.getAllocatedResource().getVirtualCores());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
+          container.getAllocatedNode().getHost());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
+          container.getAllocatedNode().getPort());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+          container.getAllocatedPriority().getPriority());
+      eventInfo.put(
+          ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+          container.getNodeHttpAddress());
+      tEvent.setInfo(eventInfo);
+
+      entity.addEvent(tEvent);
+      getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+          SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+              .getContainerId().getApplicationAttemptId().getApplicationId()));
+    }
+  }
 
-    entity.addEvent(tEvent);
-    putEntity(entity, event.getContainerId().getApplicationAttemptId()
-        .getApplicationId());
+  @SuppressWarnings("unchecked")
+  @Override
+  public void containerFinished(RMContainer container, long finishedTime) {
+    if (publishContainerMetrics) {
+      TimelineEntity entity = 
createContainerEntity(container.getContainerId());
+
+      TimelineEvent tEvent = new TimelineEvent();
+      tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
+      tEvent.setTimestamp(finishedTime);
+      Map<String, Object> eventInfo = new HashMap<String, Object>();
+      eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+          container.getDiagnosticsInfo());
+      eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+          container.getContainerExitStatus());
+      eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
+          container.getContainerState().toString());
+      tEvent.setInfo(eventInfo);
+
+      entity.addEvent(tEvent);
+      getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+          SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+              .getContainerId().getApplicationAttemptId().getApplicationId()));
+    }
   }
 
   private static ContainerEntity createContainerEntity(ContainerId 
containerId) {
@@ -300,17 +355,48 @@ public class TimelineServiceV2Publisher extends
     }
   }
 
-  private static ApplicationAttemptEntity createAppAttemptEntity(
-      ApplicationAttemptId appAttemptId) {
-    ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
-    entity.setId(appAttemptId.toString());
-    entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
-        appAttemptId.getApplicationId().toString()));
-    return entity;
+  private class ApplicationFinishPublishEvent extends TimelineV2PublishEvent {
+    private RMAppImpl app;
+
+    public ApplicationFinishPublishEvent(SystemMetricsEventType type,
+        TimelineEntity entity, RMAppImpl app) {
+      super(type, entity, app.getApplicationId());
+      this.app = app;
+    }
+
+    public RMAppImpl getRMAppImpl() {
+      return app;
+    }
   }
 
-  @Override
-  public boolean publishRMContainerMetrics() {
-    return publishContainerMetrics;
+  private class TimelineV2EventHandler
+      implements EventHandler<TimelineV2PublishEvent> {
+    @Override
+    public void handle(TimelineV2PublishEvent event) {
+      switch (event.getType()) {
+      case PUBLISH_APPLICATION_FINISHED_ENTITY:
+        putEntity(event.getEntity(), event.getApplicationId());
+        ((ApplicationFinishPublishEvent) event).getRMAppImpl()
+            .stopTimelineCollector();
+        break;
+      default:
+        putEntity(event.getEntity(), event.getApplicationId());
+        break;
+      }
+    }
+  }
+
+  private class TimelineV2PublishEvent extends TimelinePublishEvent {
+    private TimelineEntity entity;
+
+    public TimelineV2PublishEvent(SystemMetricsEventType type,
+        TimelineEntity entity, ApplicationId appId) {
+      super(type, appId);
+      this.entity = entity;
+    }
+
+    public TimelineEntity getEntity() {
+      return entity;
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5110ff3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index 0418f37..d6fdb3e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -47,7 +47,6 @@ import 
org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistor
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -68,7 +67,7 @@ import org.junit.Test;
 public class TestSystemMetricsPublisher {
 
   private static ApplicationHistoryServer timelineServer;
-  private static SystemMetricsPublisher metricsPublisher;
+  private static TimelineServiceV1Publisher metricsPublisher;
   private static TimelineStore store;
 
   @BeforeClass
@@ -89,7 +88,7 @@ public class TestSystemMetricsPublisher {
     timelineServer.start();
     store = timelineServer.getTimelineStore();
 
-    metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
+    metricsPublisher = new TimelineServiceV1Publisher();
     metricsPublisher.init(conf);
     metricsPublisher.start();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5110ff3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
index ac20335..20a5b13 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher.MultiThreadedDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -75,7 +74,7 @@ public class TestSystemMetricsPublisherForV2 {
       TestSystemMetricsPublisherForV2.class.getName() + "-localDir")
       .getAbsoluteFile();
 
-  private static SystemMetricsPublisher metricsPublisher;
+  private static TimelineServiceV2Publisher metricsPublisher;
   private static DrainDispatcher dispatcher = new DrainDispatcher();
   private static final String DEFAULT_FLOW_VERSION = "1";
   private static final long DEFAULT_FLOW_RUN = 1;
@@ -103,10 +102,11 @@ public class TestSystemMetricsPublisherForV2 {
     rmTimelineCollectorManager.init(conf);
     rmTimelineCollectorManager.start();
 
-    metricsPublisher = new SystemMetricsPublisher(rmContext) {
+    dispatcher.init(conf);
+    dispatcher.start();
+    metricsPublisher = new TimelineServiceV2Publisher(rmContext) {
       @Override
-      Dispatcher createDispatcher(
-          TimelineServicePublisher timelineServicePublisher) {
+      protected Dispatcher getDispatcher() {
         return dispatcher;
       }
     };
@@ -150,8 +150,8 @@ public class TestSystemMetricsPublisherForV2 {
   @Test
   public void testSystemMetricPublisherInitialization() {
     @SuppressWarnings("resource")
-    SystemMetricsPublisher metricsPublisher =
-        new SystemMetricsPublisher(mock(RMContext.class));
+    TimelineServiceV2Publisher metricsPublisher =
+        new TimelineServiceV2Publisher(mock(RMContext.class));
     try {
       Configuration conf = getTimelineV2Conf();
       conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
@@ -163,20 +163,18 @@ public class TestSystemMetricsPublisherForV2 {
 
       metricsPublisher.stop();
 
-      metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
+      metricsPublisher = new TimelineServiceV2Publisher(mock(RMContext.class));
       conf = getTimelineV2Conf();
       metricsPublisher.init(conf);
+      metricsPublisher.start();
       assertTrue("Expected to publish container Metrics from RM",
           metricsPublisher.isPublishContainerMetrics());
-      assertTrue(
-          "MultiThreadedDispatcher expected when container Metrics is not 
published",
-          metricsPublisher.getDispatcher() instanceof MultiThreadedDispatcher);
     } finally {
       metricsPublisher.stop();
     }
   }
 
-  @Test(timeout = 1000000)
+  @Test(timeout = 10000)
   public void testPublishApplicationMetrics() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(0, 1);
     RMApp app = createAppAndRegister(appId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5110ff3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 7637410..a642a78 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;

Reply via email to