YARN-4129. Refactor the SystemMetricPublisher in RM to better support newer 
events (Naganarasimha G R via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aaf74546
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aaf74546
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aaf74546

Branch: refs/heads/YARN-2928-rebase
Commit: aaf745464b721d1ef2eb1967abcfb42a09cce731
Parents: c091086
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Oct 22 17:56:32 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Mon Nov 9 16:13:16 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../dev-support/findbugs-exclude.xml            |   4 +-
 .../server/resourcemanager/ResourceManager.java |  33 +-
 .../metrics/AbstractSystemMetricsPublisher.java | 168 +++++++++
 .../AbstractTimelineServicePublisher.java       | 184 ----------
 .../metrics/AppAttemptFinishedEvent.java        |  82 -----
 .../metrics/AppAttemptRegisteredEvent.java      |  81 -----
 .../metrics/ApplicationACLsUpdatedEvent.java    |  45 ---
 .../metrics/ApplicationCreatedEvent.java        | 115 ------
 .../metrics/ApplicationFinishedEvent.java       |  91 -----
 .../metrics/ApplicationUpdatedEvent.java        |  54 ---
 .../metrics/ContainerCreatedEvent.java          |  73 ----
 .../metrics/ContainerFinishedEvent.java         |  65 ----
 .../metrics/NoOpSystemMetricPublisher.java      |  65 ++++
 .../metrics/SystemMetricsEvent.java             |  33 --
 .../metrics/SystemMetricsEventType.java         |  36 --
 .../metrics/SystemMetricsPublisher.java         | 236 +-----------
 .../metrics/TimelineServiceV1Publisher.java     | 264 ++++++++------
 .../metrics/TimelineServiceV2Publisher.java     | 364 ++++++++++++-------
 .../metrics/TestSystemMetricsPublisher.java     |   5 +-
 .../TestSystemMetricsPublisherForV2.java        |  22 +-
 .../scheduler/fair/TestFSLeafQueue.java         |   1 -
 22 files changed, 671 insertions(+), 1353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7959609..7c40f8c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -135,6 +135,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3836. add equals and hashCode to TimelineEntity and other classes in
     the data model (Li Lu via sjlee)
 
+    YARN-4129. Refactor the SystemMetricPublisher in RM to better support
+    newer events (Naganarasimha G R via sjlee)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml 
b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index dae0353..85e27d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -117,12 +117,12 @@
 
   <!-- Object cast is based on the event type -->
   <Match>
-    <Class 
name="org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher"
 />
+    <Class 
name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher$ApplicationEventHandler"
 />
      <Bug pattern="BC_UNCONFIRMED_CAST" />
   </Match>
 
   <Match>
-    <Class 
name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher$ApplicationEventHandler"
 />
+    <Class 
name="org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher$TimelineV2EventHandler"
 />
      <Bug pattern="BC_UNCONFIRMED_CAST" />
   </Match>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 6b58942..a5de053 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -64,7 +64,10 @@ import 
org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
+import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
 import 
org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -94,11 +97,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
 import 
org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
-import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
@@ -266,8 +269,9 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
     addService(rmApplicationHistoryWriter);
     rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
 
-    SystemMetricsPublisher systemMetricsPublisher = 
createSystemMetricsPublisher();
-    addService(systemMetricsPublisher);
+    SystemMetricsPublisher systemMetricsPublisher =
+        createSystemMetricsPublisher();
+    addIfService(systemMetricsPublisher);
     rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
 
     super.serviceInit(this.conf);
@@ -371,7 +375,24 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
   }
 
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
-    return new SystemMetricsPublisher(rmContext);
+    boolean timelineServiceEnabled =
+        conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+    SystemMetricsPublisher publisher = null;
+    if (timelineServiceEnabled) {
+      if 
(conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+          YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
+        LOG.info("TimelineService V1 is configured");
+        publisher = new TimelineServiceV1Publisher();
+      } else {
+        LOG.info("TimelineService V2 is configured");
+        publisher = new TimelineServiceV2Publisher(rmContext);
+      }
+    } else {
+      LOG.info("TimelineServicePublisher is not configured");
+      publisher = new NoOpSystemMetricPublisher();
+    }
+    return publisher;
   }
 
   // sanity check for configurations
@@ -494,10 +515,6 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
       addService(rmApplicationHistoryWriter);
       rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
 
-      SystemMetricsPublisher systemMetricsPublisher = 
createSystemMetricsPublisher();
-      addService(systemMetricsPublisher);
-      rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
-
       RMTimelineCollectorManager timelineCollectorManager =
           createRMTimelineCollectorManager();
       addService(timelineCollectorManager);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java
new file mode 100644
index 0000000..a8c00a4
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public abstract class AbstractSystemMetricsPublisher extends CompositeService
+    implements SystemMetricsPublisher {
+  private MultiThreadedDispatcher dispatcher;
+
+  protected Dispatcher getDispatcher() {
+    return dispatcher;
+  }
+
+  public AbstractSystemMetricsPublisher(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    dispatcher =
+    new MultiThreadedDispatcher(getConfig().getInt(
+        YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+        
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
+    dispatcher.setDrainEventsOnStop();
+    addIfService(dispatcher);
+    super.serviceInit(conf);
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static class MultiThreadedDispatcher extends CompositeService
+      implements Dispatcher {
+
+    private List<AsyncDispatcher> dispatchers =
+        new ArrayList<AsyncDispatcher>();
+
+    public MultiThreadedDispatcher(int num) {
+      super(MultiThreadedDispatcher.class.getName());
+      for (int i = 0; i < num; ++i) {
+        AsyncDispatcher dispatcher = createDispatcher();
+        dispatchers.add(dispatcher);
+        addIfService(dispatcher);
+      }
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return new CompositEventHandler();
+    }
+
+    @Override
+    public void register(Class<? extends Enum> eventType,
+        EventHandler handler) {
+      for (AsyncDispatcher dispatcher : dispatchers) {
+        dispatcher.register(eventType, handler);
+      }
+    }
+
+    public void setDrainEventsOnStop() {
+      for (AsyncDispatcher dispatcher : dispatchers) {
+        dispatcher.setDrainEventsOnStop();
+      }
+    }
+
+    private class CompositEventHandler implements EventHandler<Event> {
+
+      @Override
+      public void handle(Event event) {
+        // Use hashCode (of ApplicationId) to dispatch the event to the child
+        // dispatcher, such that all the writing events of one application will
+        // be handled by one thread, the scheduled order of the these events
+        // will be preserved
+        int index = (event.hashCode() & Integer.MAX_VALUE) % 
dispatchers.size();
+        dispatchers.get(index).getEventHandler().handle(event);
+      }
+    }
+
+    protected AsyncDispatcher createDispatcher() {
+      return new AsyncDispatcher();
+    }
+  }
+
+  /**
+   * EventType which is used while publishing the events
+   */
+  protected static enum SystemMetricsEventType {
+    PUBLISH_ENTITY, PUBLISH_APPLICATION_FINISHED_ENTITY
+  }
+
+  /**
+   * TimelinePublishEvent's hash code should be based on application's id this
+   * will ensure all the events related to a particular app goes to particular
+   * thread of MultiThreaded dispatcher.
+   */
+  protected static abstract class TimelinePublishEvent
+      extends AbstractEvent<SystemMetricsEventType> {
+
+    private ApplicationId appId;
+
+    public TimelinePublishEvent(SystemMetricsEventType type,
+        ApplicationId appId) {
+      super(type);
+      this.appId = appId;
+    }
+
+    public ApplicationId getApplicationId() {
+      return appId;
+    }
+
+    @Override
+    public int hashCode() {
+      return appId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof TimelinePublishEvent)) {
+        return false;
+      }
+      TimelinePublishEvent other = (TimelinePublishEvent) obj;
+      if (appId == null) {
+        if (other.appId != null) {
+          return false;
+        }
+      } else if (getType() == null) {
+        if (other.getType() != null) {
+          return false;
+        }
+      } else
+        if (!appId.equals(other.appId) || !getType().equals(other.getType())) {
+        return false;
+      }
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
deleted file mode 100644
index 12145267..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher;
-
-public abstract class AbstractTimelineServicePublisher extends CompositeService
-    implements TimelineServicePublisher, EventHandler<SystemMetricsEvent> {
-
-  private static final Log LOG = LogFactory
-      .getLog(TimelineServiceV2Publisher.class);
-
-  private Configuration conf;
-
-  public AbstractTimelineServicePublisher(String name) {
-    super(name);
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    this.conf = conf;
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-  }
-
-  @Override
-  public void handle(SystemMetricsEvent event) {
-    switch (event.getType()) {
-    case APP_CREATED:
-      publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
-      break;
-    case APP_FINISHED:
-      publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
-      break;
-    case APP_UPDATED:
-      publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event);
-      break;
-    case APP_ACLS_UPDATED:
-      publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
-      break;
-    case APP_ATTEMPT_REGISTERED:
-      publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
-      break;
-    case APP_ATTEMPT_FINISHED:
-      publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
-      break;
-    case CONTAINER_CREATED:
-      publishContainerCreatedEvent((ContainerCreatedEvent) event);
-      break;
-    case CONTAINER_FINISHED:
-      publishContainerFinishedEvent((ContainerFinishedEvent) event);
-      break;
-    default:
-      LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
-    }
-  }
-
-  abstract void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event);
-
-  abstract void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent 
event);
-
-  abstract void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event);
-
-  abstract void publishApplicationACLsUpdatedEvent(
-      ApplicationACLsUpdatedEvent event);
-
-  abstract void publishApplicationFinishedEvent(ApplicationFinishedEvent 
event);
-
-  abstract void publishApplicationCreatedEvent(ApplicationCreatedEvent event);
-
-  abstract void publishContainerCreatedEvent(ContainerCreatedEvent event);
-
-  abstract void publishContainerFinishedEvent(ContainerFinishedEvent event);
-
-  @Override
-  public Dispatcher getDispatcher() {
-    MultiThreadedDispatcher dispatcher =
-        new MultiThreadedDispatcher(
-            conf.getInt(
-                
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
-                
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
-    dispatcher.setDrainEventsOnStop();
-    return dispatcher;
-  }
-
-  @Override
-  public boolean publishRMContainerMetrics() {
-    return true;
-  }
-
-  @Override
-  public EventHandler<SystemMetricsEvent> getEventHandler() {
-    return this;
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public static class MultiThreadedDispatcher extends CompositeService
-      implements Dispatcher {
-
-    private List<AsyncDispatcher> dispatchers =
-        new ArrayList<AsyncDispatcher>();
-
-    public MultiThreadedDispatcher(int num) {
-      super(MultiThreadedDispatcher.class.getName());
-      for (int i = 0; i < num; ++i) {
-        AsyncDispatcher dispatcher = createDispatcher();
-        dispatchers.add(dispatcher);
-        addIfService(dispatcher);
-      }
-    }
-
-    @Override
-    public EventHandler getEventHandler() {
-      return new CompositEventHandler();
-    }
-
-    @Override
-    public void register(Class<? extends Enum> eventType, EventHandler 
handler) {
-      for (AsyncDispatcher dispatcher : dispatchers) {
-        dispatcher.register(eventType, handler);
-      }
-    }
-
-    public void setDrainEventsOnStop() {
-      for (AsyncDispatcher dispatcher : dispatchers) {
-        dispatcher.setDrainEventsOnStop();
-      }
-    }
-
-    private class CompositEventHandler implements EventHandler<Event> {
-
-      @Override
-      public void handle(Event event) {
-        // Use hashCode (of ApplicationId) to dispatch the event to the child
-        // dispatcher, such that all the writing events of one application will
-        // be handled by one thread, the scheduled order of the these events
-        // will be preserved
-        int index = (event.hashCode() & Integer.MAX_VALUE) % 
dispatchers.size();
-        dispatchers.get(index).getEventHandler().handle(event);
-      }
-    }
-
-    protected AsyncDispatcher createDispatcher() {
-      return new AsyncDispatcher();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptFinishedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptFinishedEvent.java
deleted file mode 100644
index 71d9363..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptFinishedEvent.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
-
-public class AppAttemptFinishedEvent extends
-    SystemMetricsEvent {
-
-  private ApplicationAttemptId appAttemptId;
-  private String trackingUrl;
-  private String originalTrackingUrl;
-  private String diagnosticsInfo;
-  private FinalApplicationStatus appStatus;
-  private YarnApplicationAttemptState state;
-
-  public AppAttemptFinishedEvent(
-      ApplicationAttemptId appAttemptId,
-      String trackingUrl,
-      String originalTrackingUrl,
-      String diagnosticsInfo,
-      FinalApplicationStatus appStatus,
-      YarnApplicationAttemptState state,
-      long finishedTime) {
-    super(SystemMetricsEventType.APP_ATTEMPT_FINISHED, finishedTime);
-    this.appAttemptId = appAttemptId;
-    // This is the tracking URL after the application attempt is finished
-    this.trackingUrl = trackingUrl;
-    this.originalTrackingUrl = originalTrackingUrl;
-    this.diagnosticsInfo = diagnosticsInfo;
-    this.appStatus = appStatus;
-    this.state = state;
-  }
-
-  @Override
-  public int hashCode() {
-    return appAttemptId.getApplicationId().hashCode();
-  }
-
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return appAttemptId;
-  }
-
-  public String getTrackingUrl() {
-    return trackingUrl;
-  }
-
-  public String getOriginalTrackingURL() {
-    return originalTrackingUrl;
-  }
-
-  public String getDiagnosticsInfo() {
-    return diagnosticsInfo;
-  }
-
-  public FinalApplicationStatus getFinalApplicationStatus() {
-    return appStatus;
-  }
-
-  public YarnApplicationAttemptState getYarnApplicationAttemptState() {
-    return state;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptRegisteredEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptRegisteredEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptRegisteredEvent.java
deleted file mode 100644
index 1d0f16d..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptRegisteredEvent.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-
-public class AppAttemptRegisteredEvent extends
-    SystemMetricsEvent {
-
-  private ApplicationAttemptId appAttemptId;
-  private String host;
-  private int rpcPort;
-  private String trackingUrl;
-  private String originalTrackingUrl;
-  private ContainerId masterContainerId;
-
-  public AppAttemptRegisteredEvent(
-      ApplicationAttemptId appAttemptId,
-      String host,
-      int rpcPort,
-      String trackingUrl,
-      String originalTrackingUrl,
-      ContainerId masterContainerId,
-      long registeredTime) {
-    super(SystemMetricsEventType.APP_ATTEMPT_REGISTERED, registeredTime);
-    this.appAttemptId = appAttemptId;
-    this.host = host;
-    this.rpcPort = rpcPort;
-    // This is the tracking URL after the application attempt is registered
-    this.trackingUrl = trackingUrl;
-    this.originalTrackingUrl = originalTrackingUrl;
-    this.masterContainerId = masterContainerId;
-  }
-
-  @Override
-  public int hashCode() {
-    return appAttemptId.getApplicationId().hashCode();
-  }
-
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return appAttemptId;
-  }
-
-  public String getHost() {
-    return host;
-  }
-
-  public int getRpcPort() {
-    return rpcPort;
-  }
-
-  public String getTrackingUrl() {
-    return trackingUrl;
-  }
-
-  public String getOriginalTrackingURL() {
-    return originalTrackingUrl;
-  }
-
-  public ContainerId getMasterContainerId() {
-    return masterContainerId;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationACLsUpdatedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationACLsUpdatedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationACLsUpdatedEvent.java
deleted file mode 100644
index c8b314c..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationACLsUpdatedEvent.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-
-public class ApplicationACLsUpdatedEvent extends SystemMetricsEvent {
-
-  private ApplicationId appId;
-  private String viewAppACLs;
-
-  public ApplicationACLsUpdatedEvent(ApplicationId appId,
-      String viewAppACLs,
-      long updatedTime) {
-    super(SystemMetricsEventType.APP_ACLS_UPDATED, updatedTime);
-    this.appId = appId;
-    this.viewAppACLs = viewAppACLs;
-  }
-
-  public ApplicationId getApplicationId() {
-    return appId;
-  }
-
-  public String getViewAppACLs() {
-    return viewAppACLs;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java
deleted file mode 100644
index a684dfc..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Priority;
-
-public class ApplicationCreatedEvent extends
-    SystemMetricsEvent {
-
-  private ApplicationId appId;
-  private String name;
-  private String type;
-  private String user;
-  private String queue;
-  private long submittedTime;
-  private Set<String> appTags;
-  private boolean unmanagedApplication;
-  private Priority applicationPriority;
-  private String appNodeLabelsExpression;
-  private String amNodeLabelsExpression;
-
-  public ApplicationCreatedEvent(ApplicationId appId,
-      String name,
-      String type,
-      String user,
-      String queue,
-      long submittedTime,
-      long createdTime,
-      Set<String> appTags,
-      boolean unmanagedApplication,
-      Priority applicationPriority,
-      String appNodeLabelsExpression,
-      String amNodeLabelsExpression) {
-    super(SystemMetricsEventType.APP_CREATED, createdTime);
-    this.appId = appId;
-    this.name = name;
-    this.type = type;
-    this.user = user;
-    this.queue = queue;
-    this.submittedTime = submittedTime;
-    this.appTags = appTags;
-    this.unmanagedApplication = unmanagedApplication;
-    this.applicationPriority = applicationPriority;
-    this.appNodeLabelsExpression = appNodeLabelsExpression;
-    this.amNodeLabelsExpression = amNodeLabelsExpression;
-  }
-
-  @Override
-  public int hashCode() {
-    return appId.hashCode();
-  }
-
-  public ApplicationId getApplicationId() {
-    return appId;
-  }
-
-  public String getApplicationName() {
-    return name;
-  }
-
-  public String getApplicationType() {
-    return type;
-  }
-
-  public String getUser() {
-    return user;
-  }
-
-  public String getQueue() {
-    return queue;
-  }
-
-  public long getSubmittedTime() {
-    return submittedTime;
-  }
-
-  public Set<String> getAppTags() {
-    return appTags;
-  }
-
-  public boolean isUnmanagedApp() {
-    return unmanagedApplication;
-  }
-
-  public Priority getApplicationPriority() {
-    return applicationPriority;
-  }
-
-  public String getAppNodeLabelsExpression() {
-    return appNodeLabelsExpression;
-  }
-
-  public String getAmNodeLabelsExpression() {
-    return amNodeLabelsExpression;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
deleted file mode 100644
index d9241b2..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
-
-public class ApplicationFinishedEvent extends
-    SystemMetricsEvent {
-
-  private ApplicationId appId;;
-  private String diagnosticsInfo;
-  private FinalApplicationStatus appStatus;
-  private YarnApplicationState state;
-  private ApplicationAttemptId latestAppAttemptId;
-  private RMAppMetrics appMetrics;
-  private RMAppImpl app;
-
-  public ApplicationFinishedEvent(
-      ApplicationId appId,
-      String diagnosticsInfo,
-      FinalApplicationStatus appStatus,
-      YarnApplicationState state,
-      ApplicationAttemptId latestAppAttemptId,
-      long finishedTime,
-      RMAppMetrics appMetrics,
-      RMAppImpl app) {
-    super(SystemMetricsEventType.APP_FINISHED, finishedTime);
-    this.appId = appId;
-    this.diagnosticsInfo = diagnosticsInfo;
-    this.appStatus = appStatus;
-    this.latestAppAttemptId = latestAppAttemptId;
-    this.state = state;
-    this.appMetrics = appMetrics;
-    this.app = app;
-  }
-
-  @Override
-  public int hashCode() {
-    return appId.hashCode();
-  }
-
-  public RMAppImpl getApp() {
-    return app;
-  }
-
-  public ApplicationId getApplicationId() {
-    return appId;
-  }
-
-  public String getDiagnosticsInfo() {
-    return diagnosticsInfo;
-  }
-
-  public FinalApplicationStatus getFinalApplicationStatus() {
-    return appStatus;
-  }
-
-  public YarnApplicationState getYarnApplicationState() {
-    return state;
-  }
-
-  public ApplicationAttemptId getLatestApplicationAttemptId() {
-    return latestAppAttemptId;
-  }
-
-  public RMAppMetrics getAppMetrics() {
-    return appMetrics;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java
deleted file mode 100644
index 9e5e1fd..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Priority;
-
-public class ApplicationUpdatedEvent extends SystemMetricsEvent {
-
-  private ApplicationId appId;
-  private String queue;
-  private Priority applicationPriority;
-
-  public ApplicationUpdatedEvent(ApplicationId appId, String queue,
-      long updatedTime, Priority applicationPriority) {
-    super(SystemMetricsEventType.APP_UPDATED, updatedTime);
-    this.appId = appId;
-    this.queue = queue;
-    this.applicationPriority = applicationPriority;
-  }
-
-  @Override
-  public int hashCode() {
-    return appId.hashCode();
-  }
-
-  public ApplicationId getApplicationId() {
-    return appId;
-  }
-
-  public String getQueue() {
-    return queue;
-  }
-
-  public Priority getApplicationPriority() {
-    return applicationPriority;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java
deleted file mode 100644
index 05b6781..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public class ContainerCreatedEvent extends SystemMetricsEvent {
-
-  private ContainerId containerId;
-  private Resource allocatedResource;
-  private NodeId allocatedNode;
-  private Priority allocatedPriority;
-  private String nodeHttpAddress;
-
-  public ContainerCreatedEvent(
-      ContainerId containerId,
-      Resource allocatedResource,
-      NodeId allocatedNode,
-      Priority allocatedPriority,
-      long createdTime,
-      String nodeHttpAddress) {
-    super(SystemMetricsEventType.CONTAINER_CREATED, createdTime);
-    this.containerId = containerId;
-    this.allocatedResource = allocatedResource;
-    this.allocatedNode = allocatedNode;
-    this.allocatedPriority = allocatedPriority;
-    this.nodeHttpAddress = nodeHttpAddress;
-  }
-
-  @Override
-  public int hashCode() {
-    return containerId.getApplicationAttemptId().getApplicationId().hashCode();
-  }
-
-  public ContainerId getContainerId() {
-    return containerId;
-  }
-
-  public Resource getAllocatedResource() {
-    return allocatedResource;
-  }
-
-  public NodeId getAllocatedNode() {
-    return allocatedNode;
-  }
-
-  public Priority getAllocatedPriority() {
-    return allocatedPriority;
-  }
-
-  public String getNodeHttpAddress() {
-    return nodeHttpAddress;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java
deleted file mode 100644
index aafd760..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-
-public class ContainerFinishedEvent extends SystemMetricsEvent {
-
-  private ContainerId containerId;
-  private String diagnosticsInfo;
-  private int containerExitStatus;
-  private ContainerState state;
-
-  public ContainerFinishedEvent(
-      ContainerId containerId,
-      String diagnosticsInfo,
-      int containerExitStatus,
-      ContainerState state,
-      long finishedTime) {
-    super(SystemMetricsEventType.CONTAINER_FINISHED, finishedTime);
-    this.containerId = containerId;
-    this.diagnosticsInfo = diagnosticsInfo;
-    this.containerExitStatus = containerExitStatus;
-    this.state = state;
-  }
-
-  @Override
-  public int hashCode() {
-    return containerId.getApplicationAttemptId().getApplicationId().hashCode();
-  }
-
-  public ContainerId getContainerId() {
-    return containerId;
-  }
-
-  public String getDiagnosticsInfo() {
-    return diagnosticsInfo;
-  }
-
-  public int getContainerExitStatus() {
-    return containerExitStatus;
-  }
-
-  public ContainerState getContainerState() {
-    return state;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java
new file mode 100644
index 0000000..1810df1
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ * This class does nothing when any of the methods are invoked on
+ * SystemMetricsPublisher
+ */
+public class NoOpSystemMetricPublisher implements SystemMetricsPublisher{
+
+  @Override
+  public void appCreated(RMApp app, long createdTime) {
+  }
+
+  @Override
+  public void appFinished(RMApp app, RMAppState state, long finishedTime) {
+  }
+
+  @Override
+  public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
+  }
+
+  @Override
+  public void appAttemptRegistered(RMAppAttempt appAttempt,
+      long registeredTime) {
+  }
+
+  @Override
+  public void appAttemptFinished(RMAppAttempt appAttempt,
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+  }
+
+  @Override
+  public void containerCreated(RMContainer container, long createdTime) {
+  }
+
+  @Override
+  public void containerFinished(RMContainer container, long finishedTime) {
+  }
+
+  @Override
+  public void appUpdated(RMApp app, long currentTimeMillis) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java
deleted file mode 100644
index 1847396..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-public class SystemMetricsEvent extends AbstractEvent<SystemMetricsEventType> {
-
-  public SystemMetricsEvent(SystemMetricsEventType type) {
-    super(type);
-  }
-
-  public SystemMetricsEvent(SystemMetricsEventType type, long timestamp) {
-    super(type, timestamp);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java
deleted file mode 100644
index c11034e..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-
-public enum SystemMetricsEventType {
-  // app events
-  APP_CREATED,
-  APP_FINISHED,
-  APP_ACLS_UPDATED,
-  APP_UPDATED,
-
-  // app attempt events
-  APP_ATTEMPT_REGISTERED,
-  APP_ATTEMPT_FINISHED,
-
-  // container events
-  CONTAINER_CREATED,
-  CONTAINER_FINISHED
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
index 6ab7838..f895bba 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
@@ -18,244 +18,28 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.metrics;
 
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * The class that helps RM publish metrics to the timeline server. RM will
- * always invoke the methods of this class regardless the service is enabled or
- * not. If it is disabled, publishing requests will be ignored silently.
- */
-@Private
-@Unstable
-public class SystemMetricsPublisher extends CompositeService {
-
-  private static final Log LOG = LogFactory
-      .getLog(SystemMetricsPublisher.class);
-
-  private Dispatcher dispatcher;
-  private boolean publishSystemMetrics;
-  private boolean publishContainerMetrics;
-  protected RMContext rmContext;
-
-  public SystemMetricsPublisher(RMContext rmContext) {
-    super(SystemMetricsPublisher.class.getName());
-    this.rmContext = rmContext;
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    publishSystemMetrics =
-        conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
-    if (publishSystemMetrics) {
-      TimelineServicePublisher timelineServicePublisher =
-          getTimelineServicePublisher(conf);
-      if (timelineServicePublisher != null) {
-        addService(timelineServicePublisher);
-        // init required to be called so that other methods of
-        // TimelineServicePublisher can be utilized
-        timelineServicePublisher.init(conf);
-        dispatcher = createDispatcher(timelineServicePublisher);
-        publishContainerMetrics =
-            timelineServicePublisher.publishRMContainerMetrics();
-        dispatcher.register(SystemMetricsEventType.class,
-            timelineServicePublisher.getEventHandler());
-        addIfService(dispatcher);
-      } else {
-        LOG.info("TimelineServicePublisher is not configured");
-        publishSystemMetrics = false;
-      }
-      LOG.info("YARN system metrics publishing service is enabled");
-    } else {
-      LOG.info("YARN system metrics publishing service is not enabled");
-    }
-    super.serviceInit(conf);
-  }
-
-  @VisibleForTesting
-  Dispatcher createDispatcher(TimelineServicePublisher 
timelineServicePublisher) {
-    return timelineServicePublisher.getDispatcher();
-  }
-
-  TimelineServicePublisher getTimelineServicePublisher(Configuration conf) {
-    if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
-        YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
-      return new TimelineServiceV1Publisher();
-    } else if (conf.getBoolean(
-        YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
-        YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
-      return new TimelineServiceV2Publisher(rmContext);
-    }
-    return null;
-  }
-
-  @SuppressWarnings("unchecked")
-  public void appCreated(RMApp app, long createdTime) {
-    if (publishSystemMetrics) {
-      ApplicationSubmissionContext appSubmissionContext =
-          app.getApplicationSubmissionContext();
-      dispatcher.getEventHandler().handle(
-          new ApplicationCreatedEvent(
-              app.getApplicationId(),
-              app.getName(),
-              app.getApplicationType(),
-              app.getUser(),
-              app.getQueue(),
-              app.getSubmitTime(),
-              createdTime, app.getApplicationTags(),
-              appSubmissionContext.getUnmanagedAM(),
-              appSubmissionContext.getPriority(),
-              app.getAppNodeLabelExpression(),
-              app.getAmNodeLabelExpression()));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public void appUpdated(RMApp app, long updatedTime) {
-    if (publishSystemMetrics) {
-      dispatcher.getEventHandler()
-          .handle(
-              new ApplicationUpdatedEvent(app.getApplicationId(), app
-                  .getQueue(), updatedTime, app
-                  .getApplicationSubmissionContext().getPriority()));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public void appFinished(RMApp app, RMAppState state, long finishedTime) {
-    if (publishSystemMetrics) {
-      dispatcher.getEventHandler().handle(
-          new ApplicationFinishedEvent(
-              app.getApplicationId(),
-              app.getDiagnostics().toString(),
-              app.getFinalApplicationStatus(),
-              RMServerUtils.createApplicationState(state),
-              app.getCurrentAppAttempt() == null ?
-                  null : app.getCurrentAppAttempt().getAppAttemptId(),
-              finishedTime,
-              app.getRMAppMetrics(),
-              (RMAppImpl)app));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public void appACLsUpdated(RMApp app, String appViewACLs,
-      long updatedTime) {
-    if (publishSystemMetrics) {
-      dispatcher.getEventHandler().handle(
-          new ApplicationACLsUpdatedEvent(
-              app.getApplicationId(),
-              appViewACLs == null ? "" : appViewACLs,
-              updatedTime));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public void appAttemptRegistered(RMAppAttempt appAttempt,
-      long registeredTime) {
-    if (publishSystemMetrics) {
-      dispatcher.getEventHandler().handle(
-          new AppAttemptRegisteredEvent(
-              appAttempt.getAppAttemptId(),
-              appAttempt.getHost(),
-              appAttempt.getRpcPort(),
-              appAttempt.getTrackingUrl(),
-              appAttempt.getOriginalTrackingUrl(),
-              appAttempt.getMasterContainer().getId(),
-              registeredTime));
-    }
-  }
+public interface SystemMetricsPublisher {
 
-  @SuppressWarnings("unchecked")
-  public void appAttemptFinished(RMAppAttempt appAttempt,
-      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
-    if (publishSystemMetrics) {
-      dispatcher.getEventHandler().handle(
-          new AppAttemptFinishedEvent(
-              appAttempt.getAppAttemptId(),
-              appAttempt.getTrackingUrl(),
-              appAttempt.getOriginalTrackingUrl(),
-              appAttempt.getDiagnostics(),
-              // app will get the final status from app attempt, or create one
-              // based on app state if it doesn't exist
-              app.getFinalApplicationStatus(),
-              RMServerUtils.createApplicationAttemptState(appAttemtpState),
-              finishedTime));
-    }
-  }
+  void appCreated(RMApp app, long createdTime);
 
-  @SuppressWarnings("unchecked")
-  public void containerCreated(RMContainer container, long createdTime) {
-    if (publishContainerMetrics) {
-      dispatcher.getEventHandler().handle(
-          new ContainerCreatedEvent(
-              container.getContainerId(),
-              container.getAllocatedResource(),
-              container.getAllocatedNode(),
-              container.getAllocatedPriority(),
-              createdTime, container.getNodeHttpAddress()));
-    }
-  }
+  void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime);
 
-  @SuppressWarnings("unchecked")
-  public void containerFinished(RMContainer container, long finishedTime) {
-    if (publishContainerMetrics) {
-      dispatcher.getEventHandler().handle(
-          new ContainerFinishedEvent(
-              container.getContainerId(),
-              container.getDiagnosticsInfo(),
-              container.getContainerExitStatus(),
-              container.getContainerState(),
-              finishedTime));
-    }
-  }
+  void appUpdated(RMApp app, long updatedTime);
 
-  @VisibleForTesting
-  boolean isPublishContainerMetrics() {
-    return publishContainerMetrics;
-  }
+  void appFinished(RMApp app, RMAppState state, long finishedTime);
 
-  @VisibleForTesting
-  Dispatcher getDispatcher() {
-    return dispatcher;
-  }
+  void appAttemptRegistered(RMAppAttempt appAttempt, long registeredTime);
 
-  interface TimelineServicePublisher extends Service {
-    /**
-     * @return the Dispatcher which needs to be used to dispatch events
-     */
-    Dispatcher getDispatcher();
+  void appAttemptFinished(RMAppAttempt appAttempt,
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime);
 
-    /**
-     * @return true if RMContainerMetricsNeeds to be sent
-     */
-    boolean publishRMContainerMetrics();
+  void containerCreated(RMContainer container, long createdTime);
 
-    /**
-     * @return EventHandler which needs to be registered to the dispatcher to
-     *         handle the SystemMetricsEvent
-     */
-    EventHandler<SystemMetricsEvent> getEventHandler();
-  }
+  void containerFinished(RMContainer container, long finishedTime);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf74546/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
index 0dd1bca..a236e4e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
@@ -30,17 +30,23 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
-public class TimelineServiceV1Publisher extends
-    AbstractTimelineServicePublisher {
+public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher 
{
 
-  private static final Log LOG = LogFactory
-      .getLog(TimelineServiceV1Publisher.class);
+  private static final Log LOG =
+      LogFactory.getLog(TimelineServiceV1Publisher.class);
 
   public TimelineServiceV1Publisher() {
     super("TimelineserviceV1Publisher");
@@ -49,62 +55,70 @@ public class TimelineServiceV1Publisher extends
   private TimelineClient client;
 
   @Override
-  public void serviceInit(Configuration conf) throws Exception {
+  protected void serviceInit(Configuration conf) throws Exception {
     client = TimelineClient.createTimelineClient();
     addIfService(client);
     super.serviceInit(conf);
+    getDispatcher().register(SystemMetricsEventType.class,
+        new TimelineV1EventHandler());
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
-    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+  public void appCreated(RMApp app, long createdTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
     Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
-        event.getApplicationName());
+    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, 
app.getName());
     entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
-        event.getApplicationType());
-    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
-        event.getUser());
+        app.getApplicationType());
+    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, 
app.getUser());
     entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
+        app.getQueue());
     entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
-        event.getSubmittedTime());
+        app.getSubmitTime());
     entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
-        event.getAppTags());
+        app.getApplicationTags());
     entityInfo.put(
         ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
-        event.isUnmanagedApp());
+        app.getApplicationSubmissionContext().getUnmanagedAM());
     entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
-        event.getApplicationPriority().getPriority());
+        app.getApplicationSubmissionContext().getPriority().getPriority());
+    entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
+        app.getAmNodeLabelExpression());
+    entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
+        app.getAppNodeLabelExpression());
     entity.setOtherInfo(entityInfo);
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(createdTime);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, 
app.getApplicationId()));
   }
 
   @Override
-  void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
-    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+  public void appFinished(RMApp app, RMAppState state, long finishedTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationState().toString());
-    if (event.getLatestApplicationAttemptId() != null) {
+        app.getDiagnostics().toString());
+    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
+        RMServerUtils.createApplicationState(state).toString());
+    String latestApplicationAttemptId = app.getCurrentAppAttempt() == null
+        ? null : app.getCurrentAppAttempt().getAppAttemptId().toString();
+    if (latestApplicationAttemptId != null) {
       eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
-          event.getLatestApplicationAttemptId().toString());
+          latestApplicationAttemptId);
     }
-    RMAppMetrics appMetrics = event.getAppMetrics();
+    RMAppMetrics appMetrics = app.getRMAppMetrics();
     entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
         appMetrics.getVcoreSeconds());
     entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
@@ -112,153 +126,175 @@ public class TimelineServiceV1Publisher extends
     tEvent.setEventInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
+    // sync sending of finish event to avoid possibility of saving application
+    // finished state in RMStateStore save without publishing in ATS
+    putEntity(entity);// sync event so that ATS update is done without fail
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
-    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+  public void appUpdated(RMApp app, long updatedTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
-    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
-        .getApplicationPriority().getPriority());
+        app.getQueue());
+    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
+        app.getApplicationSubmissionContext().getPriority().getPriority());
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(updatedTime);
     tEvent.setEventInfo(eventInfo);
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, 
app.getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
-    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+  public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
 
     TimelineEvent tEvent = new TimelineEvent();
     Map<String, Object> entityInfo = new HashMap<String, Object>();
     entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
-        event.getViewAppACLs());
+        (appViewACLs == null) ? "" : appViewACLs);
     entity.setOtherInfo(entityInfo);
     tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(updatedTime);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private static TimelineEntity createApplicationEntity(
-      ApplicationId applicationId) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
-    entity.setEntityId(applicationId.toString());
-    return entity;
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, 
app.getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
+  public void appAttemptRegistered(RMAppAttempt appAttempt,
+      long registeredTime) {
     TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(registeredTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
+        appAttempt.getTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
-    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost());
+        appAttempt.getOriginalTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
+        appAttempt.getHost());
     eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
-        event.getRpcPort());
-    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event
-        .getMasterContainerId().toString());
+        appAttempt.getRpcPort());
+    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
+        appAttempt.getMasterContainer().getId().toString());
     tEvent.setEventInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(
+        new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
+
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
+  public void appAttemptFinished(RMAppAttempt appAttempt,
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
     TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
+        appAttempt.getTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
+        appAttempt.getOriginalTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationAttemptState().toString());
+        appAttempt.getDiagnostics());
+    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils
+        .createApplicationAttemptState(appAttemtpState).toString());
     tEvent.setEventInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private static TimelineEntity createAppAttemptEntity(
-      ApplicationAttemptId appAttemptId) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE);
-    entity.setEntityId(appAttemptId.toString());
-    entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
-        appAttemptId.getApplicationId().toString());
-    return entity;
+    getDispatcher().getEventHandler().handle(
+        new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishContainerCreatedEvent(ContainerCreatedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
+  public void containerCreated(RMContainer container, long createdTime) {
+    TimelineEntity entity = createContainerEntity(container.getContainerId());
     Map<String, Object> entityInfo = new HashMap<String, Object>();
     entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
-        event.getAllocatedResource().getMemory());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
-        .getAllocatedResource().getVirtualCores());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
-        .getAllocatedNode().getHost());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
-        .getAllocatedNode().getPort());
+        container.getAllocatedResource().getMemory());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
+        container.getAllocatedResource().getVirtualCores());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
+        container.getAllocatedNode().getHost());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
+        container.getAllocatedNode().getPort());
     entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
-        event.getAllocatedPriority().getPriority());
+        container.getAllocatedPriority().getPriority());
     entityInfo.put(
         ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
-        event.getNodeHttpAddress());
+        container.getNodeHttpAddress());
     entity.setOtherInfo(entityInfo);
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(createdTime);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+            .getContainerId().getApplicationAttemptId().getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishContainerFinishedEvent(ContainerFinishedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
+  public void containerFinished(RMContainer container, long finishedTime) {
+    TimelineEntity entity = createContainerEntity(container.getContainerId());
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
+        container.getDiagnosticsInfo());
     eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
-        event.getContainerExitStatus());
-    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
-        .getContainerState().toString());
+        container.getContainerExitStatus());
+    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
+        container.getContainerState().toString());
     tEvent.setEventInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+            .getContainerId().getApplicationAttemptId().getApplicationId()));
+  }
+
+  private static TimelineEntity createApplicationEntity(
+      ApplicationId applicationId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
+    entity.setEntityId(applicationId.toString());
+    return entity;
+  }
+
+  private static TimelineEntity createAppAttemptEntity(
+      ApplicationAttemptId appAttemptId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE);
+    entity.setEntityId(appAttemptId.toString());
+    entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
+        appAttemptId.getApplicationId().toString());
+    return entity;
   }
 
   private static TimelineEntity createContainerEntity(ContainerId containerId) 
{
@@ -283,4 +319,26 @@ public class TimelineServiceV1Publisher extends
           + entity.getEntityId() + "]", e);
     }
   }
+
+  private class TimelineV1PublishEvent extends TimelinePublishEvent {
+    private TimelineEntity entity;
+
+    public TimelineV1PublishEvent(SystemMetricsEventType type,
+        TimelineEntity entity, ApplicationId appId) {
+      super(type, appId);
+      this.entity = entity;
+    }
+
+    public TimelineEntity getEntity() {
+      return entity;
+    }
+  }
+
+  private class TimelineV1EventHandler
+      implements EventHandler<TimelineV1PublishEvent> {
+    @Override
+    public void handle(TimelineV1PublishEvent event) {
+      putEntity(event.getEntity());
+    }
+  }
 }

Reply via email to