http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 53d8ae5..45ff79c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -97,6 +97,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@@ -152,6 +153,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private long storedFinishTime = 0;
   private int firstAttemptIdInStateStore = 1;
   private int nextAttemptId = 1;
+  private String collectorAddr;
   // This field isn't protected by readlock now.
   private volatile RMAppAttempt currentAttempt;
   private String queue;
@@ -199,6 +201,8 @@ public class RMAppImpl implements RMApp, Recoverable {
      // Transitions from NEW state
     .addTransition(RMAppState.NEW, RMAppState.NEW,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.NEW, RMAppState.NEW,
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
         RMAppEventType.START, new RMAppNewlySavingTransition())
     .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@@ -215,6 +219,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     // Transitions from NEW_SAVING state
     .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
         RMAppEventType.APP_NEW_SAVED, new 
AddApplicationToSchedulerTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@@ -233,6 +239,8 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
         RMAppEventType.MOVE, new RMAppMoveTransition())
+    .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
         RMAppEventType.APP_REJECTED,
         new FinalSavingTransition(
@@ -249,6 +257,8 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
         RMAppEventType.MOVE, new RMAppMoveTransition())
+    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
         RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
             YarnApplicationState.RUNNING))
@@ -276,6 +286,8 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.MOVE, new RMAppMoveTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_UNREGISTERED,
         new FinalSavingTransition(
@@ -305,6 +317,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
+    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
         EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -316,6 +330,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
+    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
       EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -327,6 +343,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.KILLING, RMAppState.KILLING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
+    .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_KILLED,
         new FinalSavingTransition(
@@ -398,8 +416,19 @@ public class RMAppImpl implements RMApp, Recoverable {
       Configuration config, String name, String user, String queue,
       ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
       ApplicationMasterService masterService, long submitTime,
-      String applicationType, Set<String> applicationTags, 
+      String applicationType, Set<String> applicationTags,
       ResourceRequest amReq) {
+    this(applicationId, rmContext, config, name, user, queue, 
submissionContext,
+      scheduler, masterService, submitTime, applicationType, applicationTags,
+      amReq, -1);
+  }
+
+  public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
+      Configuration config, String name, String user, String queue,
+      ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
+      ApplicationMasterService masterService, long submitTime,
+      String applicationType, Set<String> applicationTags,
+      ResourceRequest amReq, long startTime) {
 
     this.systemClock = SystemClock.getInstance();
 
@@ -415,7 +444,11 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.scheduler = scheduler;
     this.masterService = masterService;
     this.submitTime = submitTime;
-    this.startTime = this.systemClock.getTime();
+    if (startTime <= 0) {
+      this.startTime = this.systemClock.getTime();
+    } else {
+      this.startTime = startTime;
+    }
     this.applicationType = applicationType;
     this.applicationTags = applicationTags;
     this.amReq = amReq;
@@ -493,6 +526,25 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
   }
 
+  /**
+   * Starts the application level timeline collector for this app. This should
+   * be used only if the timeline service v.2 is enabled.
+   */
+  public void startTimelineCollector() {
+    AppLevelTimelineCollector collector =
+        new AppLevelTimelineCollector(applicationId);
+    rmContext.getRMTimelineCollectorManager().putIfAbsent(
+        applicationId, collector);
+  }
+
+  /**
+   * Stops the application level timeline collector for this app. This should 
be
+   * used only if the timeline service v.2 is enabled.
+   */
+  public void stopTimelineCollector() {
+    rmContext.getRMTimelineCollectorManager().remove(applicationId);
+  }
+
   @Override
   public ApplicationId getApplicationId() {
     return this.applicationId;
@@ -562,6 +614,21 @@ public class RMAppImpl implements RMApp, Recoverable {
   }
 
   @Override
+  public String getCollectorAddr() {
+    return this.collectorAddr;
+  }
+
+  @Override
+  public void setCollectorAddr(String collectorAddress) {
+    this.collectorAddr = collectorAddress;
+  }
+
+  @Override
+  public void removeCollectorAddr() {
+    this.collectorAddr = null;
+  }
+
+  @Override
   public String getName() {
     return this.name;
   }
@@ -829,10 +896,11 @@ public class RMAppImpl implements RMApp, Recoverable {
       this.firstAttemptIdInStateStore = appState.getFirstAttemptId();
       this.nextAttemptId = firstAttemptIdInStateStore;
     }
+    //TODO recover collector address.
+    //this.collectorAddr = appState.getCollectorAddr();
 
     // send the ATS create Event
     sendATSCreateEvent(this, this.startTime);
-
     RMAppAttemptImpl preAttempt = null;
     for (ApplicationAttemptId attemptId :
         new TreeSet<>(appState.attempts.keySet())) {
@@ -902,7 +970,24 @@ public class RMAppImpl implements RMApp, Recoverable {
       SingleArcTransition<RMAppImpl, RMAppEvent> {
     public void transition(RMAppImpl app, RMAppEvent event) {
     };
+  }
+
+  private static final class RMAppCollectorUpdateTransition
+      extends RMAppTransition {
+
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) {
+        LOG.info("Updating collector info for app: " + app.getApplicationId());
 
+        RMAppCollectorUpdateEvent appCollectorUpdateEvent =
+            (RMAppCollectorUpdateEvent) event;
+        // Update collector address
+        app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
+
+        // TODO persistent to RMStateStore for recover
+        // Save to RMStateStore
+      }
+    };
   }
 
   private static final class RMAppNodeUpdateTransition extends RMAppTransition 
{
@@ -1797,7 +1882,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
     return amNodeLabelExpression;
   }
-  
+
   @Override
   public CallerContext getCallerContext() {
     return callerContext;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
new file mode 100644
index 0000000..64c3749
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * This class extends TimelineCollectorManager to provide RM specific
+ * implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RMTimelineCollectorManager extends TimelineCollectorManager {
+  private static final Log LOG =
+      LogFactory.getLog(RMTimelineCollectorManager.class);
+
+  private RMContext rmContext;
+
+  public RMTimelineCollectorManager(RMContext rmContext) {
+    super(RMTimelineCollectorManager.class.getName());
+    this.rmContext = rmContext;
+  }
+
+  @Override
+  protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
+    RMApp app = rmContext.getRMApps().get(appId);
+    if (app == null) {
+      throw new YarnRuntimeException(
+          "Unable to get the timeline collector context info for a " +
+          "non-existing app " + appId);
+    }
+    String userId = app.getUser();
+    TimelineCollectorContext context = collector.getTimelineEntityContext();
+    if (userId != null && !userId.isEmpty()) {
+      context.setUserId(userId);
+    }
+
+    // initialize the flow in the environment with default values for those
+    // that do not specify the flow tags
+    // flow name: app name (or app id if app name is missing),
+    // flow version: "1", flow run id: start time
+    context.setFlowName(TimelineUtils.generateDefaultFlowName(
+        app.getName(), appId));
+    context.setFlowVersion(TimelineUtils.DEFAULT_FLOW_VERSION);
+    context.setFlowRunId(app.getStartTime());
+
+    // the flow context is received via the application tags
+    for (String tag : app.getApplicationTags()) {
+      String[] parts = tag.split(":", 2);
+      if (parts.length != 2 || parts[1].isEmpty()) {
+        continue;
+      }
+      switch (parts[0].toUpperCase()) {
+      case TimelineUtils.FLOW_NAME_TAG_PREFIX:
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting the flow name: " + parts[1]);
+        }
+        context.setFlowName(parts[1]);
+        break;
+      case TimelineUtils.FLOW_VERSION_TAG_PREFIX:
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting the flow version: " + parts[1]);
+        }
+        context.setFlowVersion(parts[1]);
+        break;
+      case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX:
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting the flow run id: " + parts[1]);
+        }
+        context.setFlowRunId(Long.parseLong(parts[1]));
+        break;
+      default:
+        break;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/package-info.java
new file mode 100644
index 0000000..c470011
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.resourcemanager.timelineservice
+ * contains classes related to handling of app level collectors.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 075df47..c3d04de 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -70,6 +70,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -190,9 +191,11 @@ public class TestAppManager{
     public int getCompletedAppsListSize() {
       return super.getCompletedAppsListSize();
     }
+
     public int getCompletedAppsInStateStore() {
       return this.completedAppsInStateStore;
     }
+
     public void submitApplication(
         ApplicationSubmissionContext submissionContext, String user)
             throws YarnException, IOException {
@@ -223,6 +226,8 @@ public class TestAppManager{
     long now = System.currentTimeMillis();
 
     rmContext = mockRMContext(1, now - 10);
+    rmContext
+        .setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
     ResourceScheduler scheduler = mockResourceScheduler();
     ((RMContextImpl)rmContext).setScheduler(scheduler);
     Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 88a4637..ef89509 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -140,6 +141,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Clock;
@@ -637,6 +639,9 @@ public class TestClientRMService {
         new EventHandler<Event>() {
           public void handle(Event event) {}
         });
+    doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext)
+        .getRMTimelineCollectorManager();
+
     ApplicationId appId1 = getApplicationId(100);
 
     ApplicationACLsManager mockAclsManager = 
mock(ApplicationACLsManager.class);
@@ -652,6 +657,7 @@ public class TestClientRMService {
     ClientRMService rmService =
         new ClientRMService(rmContext, yarnScheduler, appManager,
             mockAclsManager, mockQueueACLsManager, null);
+    rmService.init(new Configuration());
 
     // without name and queue
 
@@ -726,6 +732,9 @@ public class TestClientRMService {
     mockRMContext(yarnScheduler, rmContext);
     RMStateStore stateStore = mock(RMStateStore.class);
     when(rmContext.getStateStore()).thenReturn(stateStore);
+    doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext)
+    .getRMTimelineCollectorManager();
+
     RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
         null, mock(ApplicationACLsManager.class), new Configuration());
     when(rmContext.getDispatcher().getEventHandler()).thenReturn(
@@ -742,6 +751,7 @@ public class TestClientRMService {
     ClientRMService rmService =
         new ClientRMService(rmContext, yarnScheduler, appManager,
             mockAclsManager, mockQueueACLsManager, null);
+    rmService.init(new Configuration());
 
     // Initialize appnames and queues
     String[] queues = {QUEUE_1, QUEUE_2};
@@ -899,10 +909,13 @@ public class TestClientRMService {
     };
 
     when(rmContext.getDispatcher().getEventHandler()).thenReturn(eventHandler);
-      
+    doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext)
+        .getRMTimelineCollectorManager();
+
     final ClientRMService rmService =
         new ClientRMService(rmContext, yarnScheduler, appManager, null, null,
             null);
+    rmService.init(new Configuration());
 
     // submit an app and wait for it to block while in app submission
     Thread t = new Thread() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 9162258..b271b37 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -109,6 +109,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Level;
@@ -1126,6 +1127,68 @@ public class TestRMRestart extends 
ParameterizedSchedulerTestBase {
   }
 
   @Test (timeout = 60000)
+  public void testRMRestartTimelineCollectorContext() throws Exception {
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+    Map<ApplicationId, ApplicationStateData> rmAppState =
+        rmState.getApplicationState();
+    MockRM rm1 = null;
+    MockRM rm2 = null;
+    try {
+      rm1 = createMockRM(conf, memStore);
+      rm1.start();
+      MockNM nm1 =
+          new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+      nm1.registerNode();
+
+      // submit an app.
+      RMApp app = rm1.submitApp(200, "name", "user",
+          new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+          null);
+      // Check if app info has been saved.
+      ApplicationStateData appState = rmAppState.get(app.getApplicationId());
+      Assert.assertNotNull(appState);
+      Assert.assertEquals(0, appState.getAttemptCount());
+      Assert.assertEquals(appState.getApplicationSubmissionContext()
+          .getApplicationId(), app.getApplicationSubmissionContext()
+          .getApplicationId());
+
+      // Allocate the AM
+      nm1.nodeHeartbeat(true);
+      RMAppAttempt attempt = app.getCurrentAppAttempt();
+      ApplicationAttemptId attemptId1 = attempt.getAppAttemptId();
+      rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+
+      ApplicationId appId = app.getApplicationId();
+      TimelineCollectorContext contextBeforeRestart =
+          rm1.getRMContext().getRMTimelineCollectorManager().get(appId).
+              getTimelineEntityContext();
+
+      // Restart RM.
+      rm2 = createMockRM(conf, memStore);
+      rm2.start();
+      Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
+      rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+      TimelineCollectorContext contextAfterRestart =
+          rm2.getRMContext().getRMTimelineCollectorManager().get(appId).
+              getTimelineEntityContext();
+      Assert.assertEquals("Collector contexts for an app should be same " +
+          "across restarts", contextBeforeRestart, contextAfterRestart);
+    } finally {
+      conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
+      if (rm1 != null) {
+        rm1.close();
+      }
+      if (rm2 != null) {
+        rm2.close();
+      }
+    }
+  }
+
+  @Test (timeout = 60000)
   public void testDelegationTokenRestoredInDelegationTokenRenewer()
       throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index eb0d5f1..098ba54 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -61,6 +62,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
@@ -68,8 +70,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
@@ -864,6 +869,87 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     checkRebootedNMCount(rm, ++initialMetricCount);
   }
 
+  @Test
+  public void testNodeHeartbeatForAppCollectorsMap() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    // set version to 2
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    // enable aux-service based timeline collectors
+    conf.set(YarnConfiguration.NM_AUX_SERVICES, "timeline_collector");
+    conf.set(YarnConfiguration.NM_AUX_SERVICES + "."
+        + "timeline_collector" + ".class",
+        PerNodeTimelineCollectorsAuxService.class.getName());
+
+    rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:1234", 2048);
+
+    NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+
+    RMNodeImpl node1 =
+        (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    RMNodeImpl node2 =
+        (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    RMApp app1 = rm.submitApp(1024);
+    String collectorAddr1 = "1.2.3.4:5";
+    app1.setCollectorAddr(collectorAddr1);
+
+    String collectorAddr2 = "5.4.3.2:1";
+    RMApp app2 = rm.submitApp(1024);
+    app2.setCollectorAddr(collectorAddr2);
+
+    // Create a running container for app1 running on nm1
+    ContainerId runningContainerId1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+        app1.getApplicationId(), 0), 0);
+
+    ContainerStatus status1 = ContainerStatus.newInstance(runningContainerId1,
+        ContainerState.RUNNING, "", 0);
+    List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
+    statusList.add(status1);
+    NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
+        "", System.currentTimeMillis());
+    NodeStatus nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0,
+        statusList, null, nodeHealth, null, null, null);
+    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus,
+        nodeHeartbeat1));
+
+    Assert.assertEquals(1, node1.getRunningApps().size());
+    Assert.assertEquals(app1.getApplicationId(), 
node1.getRunningApps().get(0));
+
+    // Create a running container for app2 running on nm2
+    ContainerId runningContainerId2 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+        app2.getApplicationId(), 0), 0);
+
+    ContainerStatus status2 = ContainerStatus.newInstance(runningContainerId2,
+        ContainerState.RUNNING, "", 0);
+    statusList = new ArrayList<ContainerStatus>();
+    statusList.add(status2);
+    nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0,
+        statusList, null, nodeHealth, null, null, null);
+    node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus,
+        nodeHeartbeat2));
+    Assert.assertEquals(1, node2.getRunningApps().size());
+    Assert.assertEquals(app2.getApplicationId(), 
node2.getRunningApps().get(0));
+
+    nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    Map<ApplicationId, String> map1 = nodeHeartbeat1.getAppCollectorsMap();
+    Assert.assertEquals(1, map1.size());
+    Assert.assertEquals(collectorAddr1, map1.get(app1.getApplicationId()));
+
+    nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+    Map<ApplicationId, String> map2 = nodeHeartbeat2.getAppCollectorsMap();
+    Assert.assertEquals(1, map2.size());
+    Assert.assertEquals(collectorAddr2, map2.get(app2.getApplicationId()));
+  }
+
   private void checkRebootedNMCount(MockRM rm2, int count)
       throws InterruptedException {
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 560a305..19ee0b1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -95,6 +95,18 @@ public abstract class MockAsm extends MockApps {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override
+    public String getCollectorAddr() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    @Override
+    public void setCollectorAddr(String collectorAddr) {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    @Override
+    public void removeCollectorAddr() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    @Override
     public ApplicationId getApplicationId() {
       throw new UnsupportedOperationException("Not supported yet.");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 087199d..7dafc22 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -57,6 +57,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -89,9 +90,12 @@ public class TestRMAppLogAggregationStatus {
     rmContext =
         new RMContextImpl(rmDispatcher, null, null, null,
           null, null, null, null, null);
-    rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher());
+    rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
     
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
 
+    rmContext
+        .setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
+
     scheduler = mock(YarnScheduler.class);
     doAnswer(
         new Answer<Void>() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index 9ea2baa..1e279d5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -68,14 +68,14 @@ import org.junit.Test;
 public class TestSystemMetricsPublisher {
 
   private static ApplicationHistoryServer timelineServer;
-  private static SystemMetricsPublisher metricsPublisher;
+  private static TimelineServiceV1Publisher metricsPublisher;
   private static TimelineStore store;
 
   @BeforeClass
   public static void setup() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, 
true);
+    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
     conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
         MemoryTimelineStore.class, TimelineStore.class);
     conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
@@ -89,7 +89,7 @@ public class TestSystemMetricsPublisher {
     timelineServer.start();
     store = timelineServer.getTimelineStore();
 
-    metricsPublisher = new SystemMetricsPublisher();
+    metricsPublisher = new TimelineServiceV1Publisher();
     metricsPublisher.init(conf);
     metricsPublisher.start();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
new file mode 100644
index 0000000..3ea4714
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSystemMetricsPublisherForV2 {
+
+  /**
+   * The folder where the FileSystemTimelineWriterImpl writes the entities.
+   */
+  private static File testRootDir = new File("target",
+      TestSystemMetricsPublisherForV2.class.getName() + "-localDir")
+      .getAbsoluteFile();
+
+  private static TimelineServiceV2Publisher metricsPublisher;
+  private static DrainDispatcher dispatcher = new DrainDispatcher();
+
+  private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext;
+
+  private static RMTimelineCollectorManager rmTimelineCollectorManager;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    if (testRootDir.exists()) {
+      //cleanup before hand
+      FileContext.getLocalFSFileContext().delete(
+          new Path(testRootDir.getAbsolutePath()), true);
+    }
+
+    RMContext rmContext = mock(RMContext.class);
+    rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
+    when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
+    rmTimelineCollectorManager = new RMTimelineCollectorManager(rmContext);
+    when(rmContext.getRMTimelineCollectorManager()).thenReturn(
+        rmTimelineCollectorManager);
+
+    Configuration conf = getTimelineV2Conf();
+    conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+    rmTimelineCollectorManager.init(conf);
+    rmTimelineCollectorManager.start();
+
+    dispatcher.init(conf);
+    dispatcher.start();
+    metricsPublisher = new TimelineServiceV2Publisher(rmContext) {
+      @Override
+      protected Dispatcher getDispatcher() {
+        return dispatcher;
+      }
+    };
+    metricsPublisher.init(conf);
+    metricsPublisher.start();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (testRootDir.exists()) {
+      FileContext.getLocalFSFileContext().delete(
+          new Path(testRootDir.getAbsolutePath()), true);
+    }
+    if (rmTimelineCollectorManager != null) {
+      rmTimelineCollectorManager.stop();
+    }
+    if (metricsPublisher != null) {
+      metricsPublisher.stop();
+    }
+  }
+
+  private static Configuration getTimelineV2Conf() {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+    conf.setInt(
+        YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2);
+    conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED,
+        true);
+    try {
+      conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+          testRootDir.getCanonicalPath());
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert
+          .fail("Exception while setting the " +
+              "TIMELINE_SERVICE_STORAGE_DIR_ROOT ");
+    }
+    return conf;
+  }
+
+  @Test
+  public void testSystemMetricPublisherInitialization() {
+    @SuppressWarnings("resource")
+    TimelineServiceV2Publisher publisher =
+        new TimelineServiceV2Publisher(mock(RMContext.class));
+    try {
+      Configuration conf = getTimelineV2Conf();
+      conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED,
+          YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_EVENTS_ENABLED);
+      publisher.init(conf);
+      assertFalse(
+          "Default configuration should not publish container events from RM",
+          publisher.isPublishContainerEvents());
+
+      publisher.stop();
+
+      publisher = new TimelineServiceV2Publisher(mock(RMContext.class));
+      conf = getTimelineV2Conf();
+      publisher.init(conf);
+      assertTrue("Expected to have registered event handlers and set ready to "
+          + "publish events after init",
+          publisher.isPublishContainerEvents());
+      publisher.start();
+      assertTrue("Expected to publish container events from RM",
+          publisher.isPublishContainerEvents());
+    } finally {
+      publisher.stop();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testPublishApplicationMetrics() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    RMApp app = createAppAndRegister(appId);
+
+    metricsPublisher.appCreated(app, app.getStartTime());
+    metricsPublisher.appACLsUpdated(app, "user1,user2", 4L);
+    metricsPublisher.appFinished(app, RMAppState.FINISHED, 
app.getFinishTime());
+    dispatcher.await();
+
+    String outputDirApp =
+        getTimelineEntityDir(app) + "/" + TimelineEntityType.YARN_APPLICATION
+            + "/";
+
+    File entityFolder = new File(outputDirApp);
+    Assert.assertTrue(entityFolder.isDirectory());
+
+    // file name is <entityId>.thist
+    String timelineServiceFileName =
+        appId.toString()
+            + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+    File appFile = new File(outputDirApp, timelineServiceFileName);
+    Assert.assertTrue(appFile.exists());
+    verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+  }
+
+  @Test(timeout = 10000)
+  public void testPublishAppAttemptMetrics() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    RMApp app = rmAppsMapInContext.get(appId);
+    if (app == null) {
+      app = createAppAndRegister(appId);
+    }
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
+    metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
+    when(app.getFinalApplicationStatus()).thenReturn(
+        FinalApplicationStatus.UNDEFINED);
+    metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED,
+        app, Integer.MAX_VALUE + 2L);
+
+    dispatcher.await();
+
+    String outputDirApp =
+        getTimelineEntityDir(app) + "/"
+            + TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/";
+
+    File entityFolder = new File(outputDirApp);
+    Assert.assertTrue(entityFolder.isDirectory());
+
+    // file name is <entityId>.thist
+    String timelineServiceFileName =
+        appAttemptId.toString()
+            + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+    File appFile = new File(outputDirApp, timelineServiceFileName);
+    Assert.assertTrue(appFile.exists());
+    verifyEntity(appFile, 2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
+  }
+
+  @Test(timeout = 10000)
+  public void testPublishContainerMetrics() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    RMApp app = rmAppsMapInContext.get(appId);
+    if (app == null) {
+      app = createAppAndRegister(appId);
+    }
+    ContainerId containerId =
+        ContainerId.newContainerId(ApplicationAttemptId.newInstance(
+            appId, 1), 1);
+    RMContainer container = createRMContainer(containerId);
+    metricsPublisher.containerCreated(container, container.getCreationTime());
+    metricsPublisher.containerFinished(container, container.getFinishTime());
+    dispatcher.await();
+
+    String outputDirApp =
+        getTimelineEntityDir(app) + "/"
+            + TimelineEntityType.YARN_CONTAINER + "/";
+
+    File entityFolder = new File(outputDirApp);
+    Assert.assertTrue(entityFolder.isDirectory());
+
+    // file name is <entityId>.thist
+    String timelineServiceFileName =
+        containerId.toString()
+            + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+    File appFile = new File(outputDirApp, timelineServiceFileName);
+    Assert.assertTrue(appFile.exists());
+    verifyEntity(appFile, 2,
+        ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
+  }
+
+  private RMApp createAppAndRegister(ApplicationId appId) {
+    RMApp app = createRMApp(appId);
+
+    // some stuff which are currently taken care in RMAppImpl
+    rmAppsMapInContext.putIfAbsent(appId, app);
+    AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId);
+    rmTimelineCollectorManager.putIfAbsent(appId, collector);
+    return app;
+  }
+
+  private static void verifyEntity(File entityFile, long expectedEvents,
+      String eventForCreatedTime) throws IOException {
+    BufferedReader reader = null;
+    String strLine;
+    long count = 0;
+    try {
+      reader = new BufferedReader(new FileReader(entityFile));
+      while ((strLine = reader.readLine()) != null) {
+        if (strLine.trim().length() > 0) {
+          TimelineEntity entity = FileSystemTimelineReaderImpl.
+              getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class);
+          for (TimelineEvent event : entity.getEvents()) {
+            if (event.getId().equals(eventForCreatedTime)) {
+              assertTrue(entity.getCreatedTime() > 0);
+              break;
+            }
+          }
+          count++;
+        }
+      }
+    } finally {
+      reader.close();
+    }
+    assertEquals("Expected " + expectedEvents + " events to be published",
+        count, expectedEvents);
+  }
+
+  private String getTimelineEntityDir(RMApp app) {
+    String outputDirApp =
+        testRootDir.getAbsolutePath() + "/"
+            + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/"
+            + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/"
+            + app.getUser() + "/"
+            + app.getName() + "/"
+            + TimelineUtils.DEFAULT_FLOW_VERSION + "/"
+            + app.getStartTime() + "/"
+            + app.getApplicationId();
+    return outputDirApp;
+  }
+
+  private static RMApp createRMApp(ApplicationId appId) {
+    RMApp app = mock(RMAppImpl.class);
+    when(app.getApplicationId()).thenReturn(appId);
+    when(app.getName()).thenReturn("test app");
+    when(app.getApplicationType()).thenReturn("test app type");
+    when(app.getUser()).thenReturn("testUser");
+    when(app.getQueue()).thenReturn("test queue");
+    when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L);
+    when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L);
+    when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L);
+    when(app.getDiagnostics()).thenReturn(
+        new StringBuilder("test diagnostics info"));
+    RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+    when(appAttempt.getAppAttemptId()).thenReturn(
+        ApplicationAttemptId.newInstance(appId, 1));
+    when(app.getCurrentAppAttempt()).thenReturn(appAttempt);
+    when(app.getFinalApplicationStatus()).thenReturn(
+        FinalApplicationStatus.UNDEFINED);
+    when(app.getRMAppMetrics()).thenReturn(
+        new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE));
+    when(app.getApplicationTags()).thenReturn(Collections.<String> emptySet());
+    ApplicationSubmissionContext appSubmissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(appSubmissionContext.getPriority())
+        .thenReturn(Priority.newInstance(0));
+    when(app.getApplicationSubmissionContext())
+        .thenReturn(appSubmissionContext);
+    return app;
+  }
+
+  private static RMAppAttempt createRMAppAttempt(
+      ApplicationAttemptId appAttemptId) {
+    RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+    when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
+    when(appAttempt.getHost()).thenReturn("test host");
+    when(appAttempt.getRpcPort()).thenReturn(-100);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(
+        ContainerId.newContainerId(appAttemptId, 1));
+    when(appAttempt.getMasterContainer()).thenReturn(container);
+    when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
+    when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
+    when(appAttempt.getOriginalTrackingUrl()).thenReturn(
+        "test original tracking url");
+    return appAttempt;
+  }
+
+  private static RMContainer createRMContainer(ContainerId containerId) {
+    RMContainer container = mock(RMContainer.class);
+    when(container.getContainerId()).thenReturn(containerId);
+    when(container.getAllocatedNode()).thenReturn(
+        NodeId.newInstance("test host", -100));
+    when(container.getAllocatedResource()).thenReturn(
+        Resource.newInstance(-1, -1));
+    when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
+    when(container.getCreationTime()).thenReturn(Integer.MAX_VALUE + 1L);
+    when(container.getFinishTime()).thenReturn(Integer.MAX_VALUE + 2L);
+    when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
+    when(container.getContainerExitStatus()).thenReturn(-1);
+    when(container.getContainerState()).thenReturn(ContainerState.COMPLETE);
+    Container mockContainer = mock(Container.class);
+    when(container.getContainer()).thenReturn(mockContainer);
+    when(mockContainer.getNodeHttpAddress())
+      .thenReturn("http://localhost:1234";);
+    return container;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 35fd0c3..62a5c52 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -301,4 +301,18 @@ public class MockRMApp implements RMApp {
   public CallerContext getCallerContext() {
     throw new UnsupportedOperationException("Not supported yet.");
   }
+
+  public String getCollectorAddr() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public void removeCollectorAddr() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public void setCollectorAddr(String collectorAddr) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index bb5c25c..9131643 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -80,6 +80,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
@@ -225,6 +226,9 @@ public class TestRMAppTransitions {
               .getAppResourceUsageReport((ApplicationAttemptId)Matchers.any());
     doReturn(resourceScheduler).when(rmContext).getScheduler();
 
+    doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext)
+        .getRMTimelineCollectorManager();
+
     rmDispatcher.register(RMAppAttemptEventType.class,
         new TestApplicationAttemptEventDispatcher(this.rmContext));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
index eb752d1..e3e58d5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
@@ -56,6 +56,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
@@ -88,6 +93,18 @@
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minikdc</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 0f239c0..67b652b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.net.ServerSocketUtil;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.Shell;
@@ -544,6 +545,8 @@ public class MiniYARNCluster extends CompositeService {
           MiniYARNCluster.getHostname() + ":0");
       config.set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
           MiniYARNCluster.getHostname() + ":0");
+      config.set(YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+          MiniYARNCluster.getHostname() + ":0");
       WebAppUtils
           .setNMWebAppHostNameAndPort(config,
               MiniYARNCluster.getHostname(), 0);
@@ -782,8 +785,8 @@ public class MiniYARNCluster extends CompositeService {
       if (!useFixedPorts) {
         String hostname = MiniYARNCluster.getHostname();
         conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0");
-        conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, hostname
-            + ":0");
+        conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+            hostname + ":" + ServerSocketUtil.getPort(9188, 10));
       }
       appHistoryServer.init(conf);
       super.serviceInit(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
index e67a236..9226ead 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java
@@ -74,8 +74,6 @@ public class TestMiniYarnCluster {
       String hostname = MiniYARNCluster.getHostname();
       Assert.assertEquals(hostname + ":0",
         conf.get(YarnConfiguration.TIMELINE_SERVICE_ADDRESS));
-      Assert.assertEquals(hostname + ":0",
-        conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS));
 
       cluster.start();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
new file mode 100644
index 0000000..5a63547
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice;
+
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestTimelineServiceClientIntegration {
+  private static NodeTimelineCollectorManager collectorManager;
+  private static PerNodeTimelineCollectorsAuxService auxService;
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    try {
+      collectorManager = new MockNodeTimelineCollectorManager();
+      conf = new YarnConfiguration();
+      // enable timeline service v.2
+      conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+      conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+          FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+      auxService =
+          PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
+              collectorManager, conf);
+      auxService.addApplication(ApplicationId.newInstance(0, 1));
+    } catch (ExitUtil.ExitException e) {
+      fail();
+    }
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    if (auxService != null) {
+      auxService.stop();
+    }
+  }
+
+  @Test
+  public void testPutEntities() throws Exception {
+    TimelineClient client =
+        TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
+    try {
+      // set the timeline service address manually
+      client.setTimelineServiceAddress(
+          collectorManager.getRestServerBindAddress());
+      client.init(conf);
+      client.start();
+      TimelineEntity entity = new TimelineEntity();
+      entity.setType("test entity type");
+      entity.setId("test entity id");
+      TimelineMetric metric =
+          new TimelineMetric(TimelineMetric.Type.TIME_SERIES);
+      metric.setId("test metric id");
+      metric.addValue(1L, 1.0D);
+      metric.addValue(2L, 2.0D);
+      entity.addMetric(metric);
+      client.putEntities(entity);
+      client.putEntitiesAsync(entity);
+    } finally {
+      client.stop();
+    }
+  }
+
+  @Test
+  public void testPutExtendedEntities() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    TimelineClient client =
+        TimelineClient.createTimelineClient(appId);
+    try {
+      // set the timeline service address manually
+      client.setTimelineServiceAddress(
+          collectorManager.getRestServerBindAddress());
+      client.init(conf);
+      client.start();
+      ClusterEntity cluster = new ClusterEntity();
+      cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+      FlowRunEntity flow = new FlowRunEntity();
+      flow.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+      flow.setName("test_flow_name");
+      flow.setVersion("test_flow_version");
+      flow.setRunId(1L);
+      flow.setParent(cluster.getType(), cluster.getId());
+      ApplicationEntity app = new ApplicationEntity();
+      app.setId(appId.toString());
+      flow.addChild(app.getType(), app.getId());
+      ApplicationAttemptId attemptId =
+          ApplicationAttemptId.newInstance(appId, 1);
+      ApplicationAttemptEntity appAttempt = new ApplicationAttemptEntity();
+      appAttempt.setId(attemptId.toString());
+      ContainerId containerId = ContainerId.newContainerId(attemptId, 1);
+      ContainerEntity container = new ContainerEntity();
+      container.setId(containerId.toString());
+      UserEntity user = new UserEntity();
+      user.setId(UserGroupInformation.getCurrentUser().getShortUserName());
+      QueueEntity queue = new QueueEntity();
+      queue.setId("default_queue");
+      client.putEntities(cluster, flow, app, appAttempt, container, user,
+          queue);
+      client.putEntitiesAsync(cluster, flow, app, appAttempt, container, user,
+          queue);
+    } finally {
+      client.stop();
+    }
+  }
+
+  private static class MockNodeTimelineCollectorManager extends
+      NodeTimelineCollectorManager {
+    public MockNodeTimelineCollectorManager() {
+      super();
+    }
+
+    @Override
+    protected CollectorNodemanagerProtocol getNMCollectorService() {
+      CollectorNodemanagerProtocol protocol =
+          mock(CollectorNodemanagerProtocol.class);
+      try {
+        GetTimelineCollectorContextResponse response =
+            GetTimelineCollectorContextResponse.newInstance(
+                UserGroupInformation.getCurrentUser().getShortUserName(),
+                "test_flow_name", "test_flow_version", 1L);
+        when(protocol.getTimelineCollectorContext(any(
+            GetTimelineCollectorContextRequest.class))).thenReturn(response);
+      } catch (YarnException | IOException e) {
+        fail();
+      }
+      return protocol;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to