YARN-3210. Refactored timeline aggregator according to new code organization 
proposed in YARN-3166. Contributed by Li Lu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3ff7f06
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3ff7f06
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3ff7f06

Branch: refs/heads/YARN-2928
Commit: d3ff7f06cbc66d3a23c2551e7d4c752689f46afe
Parents: e4d81eb
Author: Zhijie Shen <zjs...@apache.org>
Authored: Tue Mar 3 11:21:03 2015 -0800
Committer: Zhijie Shen <zjs...@apache.org>
Committed: Tue Mar 3 11:25:17 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../distributedshell/TestDistributedShell.java  |   4 +-
 .../hadoop-yarn-server-nodemanager/pom.xml      |   5 -
 .../server/nodemanager/webapp/WebServer.java    |   3 -
 .../TestTimelineServiceClientIntegration.java   |  12 +-
 .../aggregator/AppLevelAggregatorService.java   |  57 ----
 .../aggregator/AppLevelServiceManager.java      | 136 ----------
 .../AppLevelServiceManagerProvider.java         |  33 ---
 .../aggregator/AppLevelTimelineAggregator.java  |  57 ++++
 .../aggregator/BaseAggregatorService.java       | 107 --------
 .../aggregator/PerNodeAggregatorServer.java     | 268 -------------------
 .../aggregator/PerNodeAggregatorWebService.java | 180 -------------
 .../PerNodeTimelineAggregatorsAuxService.java   | 212 +++++++++++++++
 .../aggregator/TimelineAggregator.java          | 107 ++++++++
 .../TimelineAggregatorWebService.java           | 180 +++++++++++++
 .../TimelineAggregatorsCollection.java          | 203 ++++++++++++++
 .../TestAppLevelAggregatorService.java          |  23 --
 .../aggregator/TestAppLevelServiceManager.java  | 102 -------
 .../TestAppLevelTimelineAggregator.java         |  23 ++
 .../aggregator/TestBaseAggregatorService.java   |  23 --
 .../aggregator/TestPerNodeAggregatorServer.java | 149 -----------
 ...estPerNodeTimelineAggregatorsAuxService.java | 150 +++++++++++
 .../aggregator/TestTimelineAggregator.java      |  23 ++
 .../TestTimelineAggregatorsCollection.java      | 108 ++++++++
 24 files changed, 1074 insertions(+), 1094 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b13475a..0548460 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -23,6 +23,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3125. Made the distributed shell use timeline service next gen and
     add an integration test for it. (Junping Du and Li Lu via zjshen)
 
+    YARN-3210. Refactored timeline aggregator according to new code
+    organization proposed in YARN-3166. (Li Lu via zjshen)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 71466cb..313dc97 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import 
org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
+import 
org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -96,7 +96,7 @@ public class TestDistributedShell {
       // enable aux-service based timeline aggregators
       conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
       conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + 
TIMELINE_AUX_SERVICE_NAME
-        + ".class", PerNodeAggregatorServer.class.getName());
+        + ".class", PerNodeTimelineAggregatorsAuxService.class.getName());
     }
     conf.set(YarnConfiguration.RM_SCHEDULER, 
CapacityScheduler.class.getName());
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
index 26a33b4..b1efa5f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
@@ -53,11 +53,6 @@
       <artifactId>hadoop-yarn-api</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
       <groupId>javax.xml.bind</groupId>
       <artifactId>jaxb-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
index 77deaed..fdff480 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
@@ -29,9 +29,6 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import 
org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManager;
-import 
org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManagerProvider;
-import 
org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorWebService;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index a5159a2..32ee5d8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -6,7 +6,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import 
org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
+import 
org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -14,13 +14,13 @@ import org.junit.Test;
 import static org.junit.Assert.fail;
 
 public class TestTimelineServiceClientIntegration {
-  private static PerNodeAggregatorServer server;
+  private static PerNodeTimelineAggregatorsAuxService auxService;
 
   @BeforeClass
   public static void setupClass() throws Exception {
     try {
-      server = PerNodeAggregatorServer.launchServer(new String[0]);
-      server.addApplication(ApplicationId.newInstance(0, 1));
+      auxService = PerNodeTimelineAggregatorsAuxService.launchServer(new 
String[0]);
+      auxService.addApplication(ApplicationId.newInstance(0, 1));
     } catch (ExitUtil.ExitException e) {
       fail();
     }
@@ -28,8 +28,8 @@ public class TestTimelineServiceClientIntegration {
 
   @AfterClass
   public static void tearDownClass() throws Exception {
-    if (server != null) {
-      server.stop();
+    if (auxService != null) {
+      auxService.stop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
deleted file mode 100644
index bf72fb9..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Service that handles writes to the timeline service and writes them to the
- * backing storage for a given YARN application.
- *
- * App-related lifecycle management is handled by this service.
- */
-@Private
-@Unstable
-public class AppLevelAggregatorService extends BaseAggregatorService {
-  private final String applicationId;
-  // TODO define key metadata such as flow metadata, user, and queue
-
-  public AppLevelAggregatorService(String applicationId) {
-    super(AppLevelAggregatorService.class.getName() + " - " + applicationId);
-    this.applicationId = applicationId;
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
deleted file mode 100644
index 05d321f..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-
-/**
- * Class that manages adding and removing app level aggregator services and
- * their lifecycle. It provides thread safety access to the app level services.
- *
- * It is a singleton, and instances should be obtained via
- * {@link #getInstance()}.
- */
-@Private
-@Unstable
-public class AppLevelServiceManager extends CompositeService {
-  private static final Log LOG =
-      LogFactory.getLog(AppLevelServiceManager.class);
-  private static final AppLevelServiceManager INSTANCE =
-      new AppLevelServiceManager();
-
-  // access to this map is synchronized with the map itself
-  private final Map<String,AppLevelAggregatorService> services =
-      Collections.synchronizedMap(
-          new HashMap<String,AppLevelAggregatorService>());
-
-  static AppLevelServiceManager getInstance() {
-    return INSTANCE;
-  }
-
-  AppLevelServiceManager() {
-    super(AppLevelServiceManager.class.getName());
-  }
-
-  /**
-   * Creates and adds an app level aggregator service for the specified
-   * application id. The service is also initialized and started. If the 
service
-   * already exists, no new service is created.
-   *
-   * @throws YarnRuntimeException if there was any exception in initializing 
and
-   * starting the app level service
-   * @return whether it was added successfully
-   */
-  public boolean addService(String appId) {
-    synchronized (services) {
-      AppLevelAggregatorService service = services.get(appId);
-      if (service == null) {
-        try {
-          service = new AppLevelAggregatorService(appId);
-          // initialize, start, and add it to the parent service so it can be
-          // cleaned up when the parent shuts down
-          service.init(getConfig());
-          service.start();
-          services.put(appId, service);
-          LOG.info("the application aggregator service for " + appId +
-              " was added");
-          return true;
-        } catch (Exception e) {
-          throw new YarnRuntimeException(e);
-        }
-      } else {
-        String msg = "the application aggregator service for " + appId +
-            " already exists!";
-        LOG.error(msg);
-        return false;
-      }
-    }
-  }
-
-  /**
-   * Removes the app level aggregator service for the specified application id.
-   * The service is also stopped as a result. If the service does not exist, no
-   * change is made.
-   *
-   * @return whether it was removed successfully
-   */
-  public boolean removeService(String appId) {
-    synchronized (services) {
-      AppLevelAggregatorService service = services.remove(appId);
-      if (service == null) {
-        String msg = "the application aggregator service for " + appId +
-            " does not exist!";
-        LOG.error(msg);
-        return false;
-      } else {
-        // stop the service to do clean up
-        service.stop();
-        LOG.info("the application aggregator service for " + appId +
-            " was removed");
-        return true;
-      }
-    }
-  }
-
-  /**
-   * Returns the app level aggregator service for the specified application id.
-   *
-   * @return the app level aggregator service or null if it does not exist
-   */
-  public AppLevelAggregatorService getService(String appId) {
-    return services.get(appId);
-  }
-
-  /**
-   * Returns whether the app level aggregator service for the specified
-   * application id exists.
-   */
-  public boolean hasService(String appId) {
-    return services.containsKey(appId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
deleted file mode 100644
index 8768575..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import com.google.inject.Provider;
-
-/**
- * A guice provider that provides a global singleton instance of
- * AppLevelServiceManager.
- */
-public class AppLevelServiceManagerProvider
-    implements Provider<AppLevelServiceManager> {
-  @Override
-  public AppLevelServiceManager get() {
-    return AppLevelServiceManager.getInstance();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
new file mode 100644
index 0000000..95ec9f8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage for a given YARN application.
+ *
+ * App-related lifecycle management is handled by this service.
+ */
+@Private
+@Unstable
+public class AppLevelTimelineAggregator extends TimelineAggregator {
+  private final String applicationId;
+  // TODO define key metadata such as flow metadata, user, and queue
+
+  public AppLevelTimelineAggregator(String applicationId) {
+    super(AppLevelTimelineAggregator.class.getName() + " - " + applicationId);
+    this.applicationId = applicationId;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
deleted file mode 100644
index e362139..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-
-/**
- * Service that handles writes to the timeline service and writes them to the
- * backing storage.
- *
- * Classes that extend this can add their own lifecycle management or
- * customization of request handling.
- */
-@Private
-@Unstable
-public class BaseAggregatorService extends CompositeService {
-  private static final Log LOG = 
LogFactory.getLog(BaseAggregatorService.class);
-
-  public BaseAggregatorService(String name) {
-    super(name);
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-  }
-
-  /**
-   * Handles entity writes. These writes are synchronous and are written to the
-   * backing storage without buffering/batching. If any entity already exists,
-   * it results in an update of the entity.
-   *
-   * This method should be reserved for selected critical entities and events.
-   * For normal voluminous writes one should use the async method
-   * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
-   *
-   * @param entities entities to post
-   * @param callerUgi the caller UGI
-   */
-  public void postEntities(TimelineEntities entities,
-      UserGroupInformation callerUgi) {
-    // Add this output temporarily for our prototype
-    // TODO remove this after we have an actual implementation
-    LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE");
-    LOG.info("postEntities(entities=" + entities + ", callerUgi=" +
-        callerUgi + ")");
-
-    // TODO implement
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
-          callerUgi + ")");
-    }
-  }
-
-  /**
-   * Handles entity writes in an asynchronous manner. The method returns as 
soon
-   * as validation is done. No promises are made on how quickly it will be
-   * written to the backing storage or if it will always be written to the
-   * backing storage. Multiple writes to the same entities may be batched and
-   * appropriate values updated and result in fewer writes to the backing
-   * storage.
-   *
-   * @param entities entities to post
-   * @param callerUgi the caller UGI
-   */
-  public void postEntitiesAsync(TimelineEntities entities,
-      UserGroupInformation callerUgi) {
-    // TODO implement
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
-          callerUgi + ")");
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
deleted file mode 100644
index deb21c7..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.lib.StaticUserWebFilter;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.hadoop.yarn.server.api.ContainerContext;
-import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
-import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-import org.apache.hadoop.http.HttpServer2;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
-
-/**
- * The top-level server for the per-node timeline aggregator service. Currently
- * it is defined as an auxiliary service to accommodate running within another
- * daemon (e.g. node manager).
- */
-@Private
-@Unstable
-public class PerNodeAggregatorServer extends AuxiliaryService {
-  private static final Log LOG =
-      LogFactory.getLog(PerNodeAggregatorServer.class);
-  private static final int SHUTDOWN_HOOK_PRIORITY = 30;
-  static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
-
-  private final AppLevelServiceManager serviceManager;
-  private HttpServer2 timelineRestServer;
-
-  public PerNodeAggregatorServer() {
-    // use the same singleton
-    this(AppLevelServiceManager.getInstance());
-  }
-
-  @VisibleForTesting
-  PerNodeAggregatorServer(AppLevelServiceManager serviceManager) {
-    super("timeline_aggregator");
-    this.serviceManager = serviceManager;
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    serviceManager.init(conf);
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    super.serviceStart();
-    serviceManager.start();
-    startWebApp();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    if (timelineRestServer != null) {
-      timelineRestServer.stop();
-    }
-    // stop the service manager
-    serviceManager.stop();
-    super.serviceStop();
-  }
-
-  private void startWebApp() {
-    Configuration conf = getConfig();
-    // use the same ports as the old ATS for now; we could create new 
properties
-    // for the new timeline service if needed
-    String bindAddress = WebAppUtils.getWebAppBindURL(conf,
-                          YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
-                          WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
-    LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
-    try {
-      Configuration confForInfoServer = new Configuration(conf);
-      confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
-      HttpServer2.Builder builder = new HttpServer2.Builder()
-          .setName("timeline")
-          .setConf(conf)
-          .addEndpoint(URI.create("http://"; + bindAddress));
-      timelineRestServer = builder.build();
-      // TODO: replace this by an authentication filter in future.
-      HashMap<String, String> options = new HashMap<>();
-      String username = conf.get(HADOOP_HTTP_STATIC_USER,
-          DEFAULT_HADOOP_HTTP_STATIC_USER);
-      options.put(HADOOP_HTTP_STATIC_USER, username);
-      HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
-          "static_user_filter_timeline",
-          StaticUserWebFilter.StaticUserFilter.class.getName(),
-          options, new String[] {"/*"});
-
-      timelineRestServer.addJerseyResourcePackage(
-          PerNodeAggregatorWebService.class.getPackage().getName() + ";"
-              + GenericExceptionHandler.class.getPackage().getName() + ";"
-              + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
-          "/*");
-      timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
-          AppLevelServiceManager.getInstance());
-      timelineRestServer.start();
-    } catch (Exception e) {
-      String msg = "The per-node aggregator webapp failed to start.";
-      LOG.error(msg, e);
-      throw new YarnRuntimeException(msg, e);
-    }
-  }
-
-  // these methods can be used as the basis for future service methods if the
-  // per-node aggregator runs separate from the node manager
-  /**
-   * Creates and adds an app level aggregator service for the specified
-   * application id. The service is also initialized and started. If the 
service
-   * already exists, no new service is created.
-   *
-   * @return whether it was added successfully
-   */
-  public boolean addApplication(ApplicationId appId) {
-    String appIdString = appId.toString();
-    return serviceManager.addService(appIdString);
-  }
-
-  /**
-   * Removes the app level aggregator service for the specified application id.
-   * The service is also stopped as a result. If the service does not exist, no
-   * change is made.
-   *
-   * @return whether it was removed successfully
-   */
-  public boolean removeApplication(ApplicationId appId) {
-    String appIdString = appId.toString();
-    return serviceManager.removeService(appIdString);
-  }
-
-  /**
-   * Creates and adds an app level aggregator service for the specified
-   * application id. The service is also initialized and started. If the 
service
-   * already exists, no new service is created.
-   */
-  @Override
-  public void initializeContainer(ContainerInitializationContext context) {
-    // intercept the event of the AM container being created and initialize the
-    // app level aggregator service
-    if (isApplicationMaster(context)) {
-      ApplicationId appId = context.getContainerId().
-          getApplicationAttemptId().getApplicationId();
-      addApplication(appId);
-    }
-  }
-
-  /**
-   * Removes the app level aggregator service for the specified application id.
-   * The service is also stopped as a result. If the service does not exist, no
-   * change is made.
-   */
-  @Override
-  public void stopContainer(ContainerTerminationContext context) {
-    // intercept the event of the AM container being stopped and remove the app
-    // level aggregator service
-    if (isApplicationMaster(context)) {
-      ApplicationId appId = context.getContainerId().
-          getApplicationAttemptId().getApplicationId();
-      removeApplication(appId);
-    }
-  }
-
-  private boolean isApplicationMaster(ContainerContext context) {
-    // TODO this is based on a (shaky) assumption that the container id (the
-    // last field of the full container id) for an AM is always 1
-    // we want to make this much more reliable
-    ContainerId containerId = context.getContainerId();
-    return containerId.getContainerId() == 1L;
-  }
-
-  @VisibleForTesting
-  boolean hasApplication(String appId) {
-    return serviceManager.hasService(appId);
-  }
-
-  @Override
-  public void initializeApplication(ApplicationInitializationContext context) {
-  }
-
-  @Override
-  public void stopApplication(ApplicationTerminationContext context) {
-  }
-
-  @Override
-  public ByteBuffer getMetaData() {
-    // TODO currently it is not used; we can return a more meaningful data when
-    // we connect it with an AM
-    return ByteBuffer.allocate(0);
-  }
-
-  @VisibleForTesting
-  public static PerNodeAggregatorServer launchServer(String[] args) {
-    Thread
-      .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
-    StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args,
-        LOG);
-    PerNodeAggregatorServer server = null;
-    try {
-      server = new PerNodeAggregatorServer();
-      ShutdownHookManager.get().addShutdownHook(new ShutdownHook(server),
-          SHUTDOWN_HOOK_PRIORITY);
-      YarnConfiguration conf = new YarnConfiguration();
-      server.init(conf);
-      server.start();
-    } catch (Throwable t) {
-      LOG.fatal("Error starting PerNodeAggregatorServer", t);
-      ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
-    }
-    return server;
-  }
-
-  private static class ShutdownHook implements Runnable {
-    private final PerNodeAggregatorServer server;
-
-    public ShutdownHook(PerNodeAggregatorServer server) {
-      this.server = server;
-    }
-
-    public void run() {
-      server.stop();
-    }
-  }
-
-  public static void main(String[] args) {
-    launchServer(args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
deleted file mode 100644
index ffe099e..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.*;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.webapp.ForbiddenException;
-import org.apache.hadoop.yarn.webapp.NotFoundException;
-
-import com.google.inject.Singleton;
-
-/**
- * The main per-node REST end point for timeline service writes. It is
- * essentially a container service that routes requests to the appropriate
- * per-app services.
- */
-@Private
-@Unstable
-@Singleton
-@Path("/ws/v2/timeline")
-public class PerNodeAggregatorWebService {
-  private static final Log LOG =
-      LogFactory.getLog(PerNodeAggregatorWebService.class);
-
-  private @Context ServletContext context;
-
-  @XmlRootElement(name = "about")
-  @XmlAccessorType(XmlAccessType.NONE)
-  @Public
-  @Unstable
-  public static class AboutInfo {
-
-    private String about;
-
-    public AboutInfo() {
-
-    }
-
-    public AboutInfo(String about) {
-      this.about = about;
-    }
-
-    @XmlElement(name = "About")
-    public String getAbout() {
-      return about;
-    }
-
-    public void setAbout(String about) {
-      this.about = about;
-    }
-
-  }
-
-  /**
-   * Return the description of the timeline web services.
-   */
-  @GET
-  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
-  public AboutInfo about(
-      @Context HttpServletRequest req,
-      @Context HttpServletResponse res) {
-    init(res);
-    return new AboutInfo("Timeline API");
-  }
-
-  /**
-   * Accepts writes to the aggregator, and returns a response. It simply routes
-   * the request to the app level aggregator. It expects an application as a
-   * context.
-   */
-  @PUT
-  @Path("/entities")
-  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
-  public Response putEntities(
-      @Context HttpServletRequest req,
-      @Context HttpServletResponse res,
-      @QueryParam("async") String async,
-      @QueryParam("appid") String appId,
-      TimelineEntities entities) {
-    init(res);
-    UserGroupInformation callerUgi = getUser(req);
-    if (callerUgi == null) {
-      String msg = "The owner of the posted timeline entities is not set";
-      LOG.error(msg);
-      throw new ForbiddenException(msg);
-    }
-
-    // TODO how to express async posts and handle them
-    boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
-
-    try {
-      appId = parseApplicationId(appId);
-      if (appId == null) {
-        return Response.status(Response.Status.BAD_REQUEST).build();
-      }
-      AppLevelAggregatorService service = getAggregatorService(req, appId);
-      if (service == null) {
-        LOG.error("Application not found");
-        throw new NotFoundException(); // different exception?
-      }
-      service.postEntities(entities, callerUgi);
-      return Response.ok().build();
-    } catch (Exception e) {
-      LOG.error("Error putting entities", e);
-      throw new WebApplicationException(e,
-          Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  private String parseApplicationId(String appId) {
-    // Make sure the appId is not null and is valid
-    ApplicationId appID;
-    try {
-      if (appId != null) {
-        return ConverterUtils.toApplicationId(appId.trim()).toString();
-      } else {
-        return null;
-      }
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
-  private AppLevelAggregatorService
-      getAggregatorService(HttpServletRequest req, String appIdToParse) {
-    String appIdString = parseApplicationId(appIdToParse);
-    final AppLevelServiceManager serviceManager =
-        (AppLevelServiceManager) context.getAttribute(
-            PerNodeAggregatorServer.AGGREGATOR_COLLECTION_ATTR_KEY);
-    return serviceManager.getService(appIdString);
-  }
-
-  private void init(HttpServletResponse response) {
-    response.setContentType(null);
-  }
-
-  private UserGroupInformation getUser(HttpServletRequest req) {
-    String remoteUser = req.getRemoteUser();
-    UserGroupInformation callerUgi = null;
-    if (remoteUser != null) {
-      callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
-    }
-    return callerUgi;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
new file mode 100644
index 0000000..cdc4e35
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ContainerContext;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The top-level server for the per-node timeline aggregator collection. 
Currently
+ * it is defined as an auxiliary service to accommodate running within another
+ * daemon (e.g. node manager).
+ */
+@Private
+@Unstable
+public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService {
+  private static final Log LOG =
+      LogFactory.getLog(PerNodeTimelineAggregatorsAuxService.class);
+  private static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+  private final TimelineAggregatorsCollection aggregatorCollection;
+
+  public PerNodeTimelineAggregatorsAuxService() {
+    // use the same singleton
+    this(TimelineAggregatorsCollection.getInstance());
+  }
+
+  @VisibleForTesting PerNodeTimelineAggregatorsAuxService(
+      TimelineAggregatorsCollection aggregatorCollection) {
+    super("timeline_aggregator");
+    this.aggregatorCollection = aggregatorCollection;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    aggregatorCollection.init(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    aggregatorCollection.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    aggregatorCollection.stop();
+    super.serviceStop();
+  }
+
+  // these methods can be used as the basis for future service methods if the
+  // per-node aggregator runs separate from the node manager
+  /**
+   * Creates and adds an app level aggregator for the specified application id.
+   * The aggregator is also initialized and started. If the service already
+   * exists, no new service is created.
+   *
+   * @return whether it was added successfully
+   */
+  public boolean addApplication(ApplicationId appId) {
+    String appIdString = appId.toString();
+    AppLevelTimelineAggregator aggregator =
+        new AppLevelTimelineAggregator(appIdString);
+    return (aggregatorCollection.putIfAbsent(appIdString, aggregator)
+        == aggregator);
+  }
+
+  /**
+   * Removes the app level aggregator for the specified application id. The
+   * aggregator is also stopped as a result. If the aggregator does not exist, 
no
+   * change is made.
+   *
+   * @return whether it was removed successfully
+   */
+  public boolean removeApplication(ApplicationId appId) {
+    String appIdString = appId.toString();
+    return aggregatorCollection.remove(appIdString);
+  }
+
+  /**
+   * Creates and adds an app level aggregator for the specified application id.
+   * The aggregator is also initialized and started. If the aggregator already
+   * exists, no new aggregator is created.
+   */
+  @Override
+  public void initializeContainer(ContainerInitializationContext context) {
+    // intercept the event of the AM container being created and initialize the
+    // app level aggregator service
+    if (isApplicationMaster(context)) {
+      ApplicationId appId = context.getContainerId().
+          getApplicationAttemptId().getApplicationId();
+      addApplication(appId);
+    }
+  }
+
+  /**
+   * Removes the app level aggregator for the specified application id. The
+   * aggregator is also stopped as a result. If the aggregator does not exist, 
no
+   * change is made.
+   */
+  @Override
+  public void stopContainer(ContainerTerminationContext context) {
+    // intercept the event of the AM container being stopped and remove the app
+    // level aggregator service
+    if (isApplicationMaster(context)) {
+      ApplicationId appId = context.getContainerId().
+          getApplicationAttemptId().getApplicationId();
+      removeApplication(appId);
+    }
+  }
+
+  private boolean isApplicationMaster(ContainerContext context) {
+    // TODO this is based on a (shaky) assumption that the container id (the
+    // last field of the full container id) for an AM is always 1
+    // we want to make this much more reliable
+    ContainerId containerId = context.getContainerId();
+    return containerId.getContainerId() == 1L;
+  }
+
+  @VisibleForTesting
+  boolean hasApplication(String appId) {
+    return aggregatorCollection.containsKey(appId);
+  }
+
+  @Override
+  public void initializeApplication(ApplicationInitializationContext context) {
+  }
+
+  @Override
+  public void stopApplication(ApplicationTerminationContext context) {
+  }
+
+  @Override
+  public ByteBuffer getMetaData() {
+    // TODO currently it is not used; we can return a more meaningful data when
+    // we connect it with an AM
+    return ByteBuffer.allocate(0);
+  }
+
+  @VisibleForTesting
+  public static PerNodeTimelineAggregatorsAuxService launchServer(String[] 
args) {
+    Thread
+      .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    
StringUtils.startupShutdownMessage(PerNodeTimelineAggregatorsAuxService.class, 
args,
+        LOG);
+    PerNodeTimelineAggregatorsAuxService auxService = null;
+    try {
+      auxService = new PerNodeTimelineAggregatorsAuxService();
+      ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
+          SHUTDOWN_HOOK_PRIORITY);
+      YarnConfiguration conf = new YarnConfiguration();
+      auxService.init(conf);
+      auxService.start();
+    } catch (Throwable t) {
+      LOG.fatal("Error starting PerNodeAggregatorServer", t);
+      ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
+    }
+    return auxService;
+  }
+
+  private static class ShutdownHook implements Runnable {
+    private final PerNodeTimelineAggregatorsAuxService auxService;
+
+    public ShutdownHook(PerNodeTimelineAggregatorsAuxService auxService) {
+      this.auxService = auxService;
+    }
+
+    public void run() {
+      auxService.stop();
+    }
+  }
+
+  public static void main(String[] args) {
+    launchServer(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
new file mode 100644
index 0000000..4227712
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage.
+ *
+ * Classes that extend this can putIfAbsent their own lifecycle management or
+ * customization of request handling.
+ */
+@Private
+@Unstable
+public abstract class TimelineAggregator extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(TimelineAggregator.class);
+
+  public TimelineAggregator(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  /**
+   * Handles entity writes. These writes are synchronous and are written to the
+   * backing storage without buffering/batching. If any entity already exists,
+   * it results in an update of the entity.
+   *
+   * This method should be reserved for selected critical entities and events.
+   * For normal voluminous writes one should use the async method
+   * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
+   *
+   * @param entities entities to post
+   * @param callerUgi the caller UGI
+   */
+  public void postEntities(TimelineEntities entities,
+      UserGroupInformation callerUgi) {
+    // Add this output temporarily for our prototype
+    // TODO remove this after we have an actual implementation
+    LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE");
+    LOG.info("postEntities(entities=" + entities + ", callerUgi=" +
+        callerUgi + ")");
+
+    // TODO implement
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
+          callerUgi + ")");
+    }
+  }
+
+  /**
+   * Handles entity writes in an asynchronous manner. The method returns as 
soon
+   * as validation is done. No promises are made on how quickly it will be
+   * written to the backing storage or if it will always be written to the
+   * backing storage. Multiple writes to the same entities may be batched and
+   * appropriate values updated and result in fewer writes to the backing
+   * storage.
+   *
+   * @param entities entities to post
+   * @param callerUgi the caller UGI
+   */
+  public void postEntitiesAsync(TimelineEntities entities,
+      UserGroupInformation callerUgi) {
+    // TODO implement
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
+          callerUgi + ")");
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
new file mode 100644
index 0000000..7d42f94
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.*;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.inject.Singleton;
+
+/**
+ * The main per-node REST end point for timeline service writes. It is
+ * essentially a container service that routes requests to the appropriate
+ * per-app services.
+ */
+@Private
+@Unstable
+@Singleton
+@Path("/ws/v2/timeline")
+public class TimelineAggregatorWebService {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineAggregatorWebService.class);
+
+  private @Context ServletContext context;
+
+  @XmlRootElement(name = "about")
+  @XmlAccessorType(XmlAccessType.NONE)
+  @Public
+  @Unstable
+  public static class AboutInfo {
+
+    private String about;
+
+    public AboutInfo() {
+
+    }
+
+    public AboutInfo(String about) {
+      this.about = about;
+    }
+
+    @XmlElement(name = "About")
+    public String getAbout() {
+      return about;
+    }
+
+    public void setAbout(String about) {
+      this.about = about;
+    }
+
+  }
+
+  /**
+   * Return the description of the timeline web services.
+   */
+  @GET
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public AboutInfo about(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res) {
+    init(res);
+    return new AboutInfo("Timeline API");
+  }
+
+  /**
+   * Accepts writes to the aggregator, and returns a response. It simply routes
+   * the request to the app level aggregator. It expects an application as a
+   * context.
+   */
+  @PUT
+  @Path("/entities")
+  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public Response putEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @QueryParam("async") String async,
+      @QueryParam("appid") String appId,
+      TimelineEntities entities) {
+    init(res);
+    UserGroupInformation callerUgi = getUser(req);
+    if (callerUgi == null) {
+      String msg = "The owner of the posted timeline entities is not set";
+      LOG.error(msg);
+      throw new ForbiddenException(msg);
+    }
+
+    // TODO how to express async posts and handle them
+    boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
+
+    try {
+      appId = parseApplicationId(appId);
+      if (appId == null) {
+        return Response.status(Response.Status.BAD_REQUEST).build();
+      }
+      TimelineAggregator service = getAggregatorService(req, appId);
+      if (service == null) {
+        LOG.error("Application not found");
+        throw new NotFoundException(); // different exception?
+      }
+      service.postEntities(entities, callerUgi);
+      return Response.ok().build();
+    } catch (Exception e) {
+      LOG.error("Error putting entities", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private String parseApplicationId(String appId) {
+    // Make sure the appId is not null and is valid
+    ApplicationId appID;
+    try {
+      if (appId != null) {
+        return ConverterUtils.toApplicationId(appId.trim()).toString();
+      } else {
+        return null;
+      }
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  private TimelineAggregator
+      getAggregatorService(HttpServletRequest req, String appIdToParse) {
+    String appIdString = parseApplicationId(appIdToParse);
+    final TimelineAggregatorsCollection aggregatorCollection =
+        (TimelineAggregatorsCollection) context.getAttribute(
+            TimelineAggregatorsCollection.AGGREGATOR_COLLECTION_ATTR_KEY);
+    return aggregatorCollection.get(appIdString);
+  }
+
+  private void init(HttpServletResponse response) {
+    response.setContentType(null);
+  }
+
+  private UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUgi = null;
+    if (remoteUser != null) {
+      callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUgi;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
new file mode 100644
index 0000000..73b6d52
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+
+/**
+ * Class that manages adding and removing aggregators and their lifecycle. It
+ * provides thread safety access to the aggregators inside.
+ *
+ * It is a singleton, and instances should be obtained via
+ * {@link #getInstance()}.
+ */
+@Private
+@Unstable
+public class TimelineAggregatorsCollection extends CompositeService {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineAggregatorsCollection.class);
+  private static final TimelineAggregatorsCollection INSTANCE =
+      new TimelineAggregatorsCollection();
+
+  // access to this map is synchronized with the map itself
+  private final Map<String, TimelineAggregator> aggregators =
+      Collections.synchronizedMap(
+          new HashMap<String, TimelineAggregator>());
+
+  // REST server for this aggregator collection
+  private HttpServer2 timelineRestServer;
+
+  static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
+
+  static TimelineAggregatorsCollection getInstance() {
+    return INSTANCE;
+  }
+
+  TimelineAggregatorsCollection() {
+    super(TimelineAggregatorsCollection.class.getName());
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    startWebApp();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (timelineRestServer != null) {
+      timelineRestServer.stop();
+    }
+    super.serviceStop();
+  }
+
+  /**
+   * Put the aggregator into the collection if an aggregator mapped by id does
+   * not exist.
+   *
+   * @throws YarnRuntimeException if there was any exception in initializing 
and
+   * starting the app level service
+   * @return the aggregator associated with id after the potential put.
+   */
+  public TimelineAggregator putIfAbsent(String id, TimelineAggregator 
aggregator) {
+    synchronized (aggregators) {
+      TimelineAggregator aggregatorInTable = aggregators.get(id);
+      if (aggregatorInTable == null) {
+        try {
+          // initialize, start, and add it to the collection so it can be
+          // cleaned up when the parent shuts down
+          aggregator.init(getConfig());
+          aggregator.start();
+          aggregators.put(id, aggregator);
+          LOG.info("the aggregator for " + id + " was added");
+          return aggregator;
+        } catch (Exception e) {
+          throw new YarnRuntimeException(e);
+        }
+      } else {
+        String msg = "the aggregator for " + id + " already exists!";
+        LOG.error(msg);
+        return aggregatorInTable;
+      }
+    }
+  }
+
+  /**
+   * Removes the aggregator for the specified id. The aggregator is also 
stopped
+   * as a result. If the aggregator does not exist, no change is made.
+   *
+   * @return whether it was removed successfully
+   */
+  public boolean remove(String id) {
+    synchronized (aggregators) {
+      TimelineAggregator aggregator = aggregators.remove(id);
+      if (aggregator == null) {
+        String msg = "the aggregator for " + id + " does not exist!";
+        LOG.error(msg);
+        return false;
+      } else {
+        // stop the service to do clean up
+        aggregator.stop();
+        LOG.info("the aggregator service for " + id + " was removed");
+        return true;
+      }
+    }
+  }
+
+  /**
+   * Returns the aggregator for the specified id.
+   *
+   * @return the aggregator or null if it does not exist
+   */
+  public TimelineAggregator get(String id) {
+    return aggregators.get(id);
+  }
+
+  /**
+   * Returns whether the aggregator for the specified id exists in this
+   * collection.
+   */
+  public boolean containsKey(String id) {
+    return aggregators.containsKey(id);
+  }
+
+  /**
+   * Launch the REST web server for this aggregator collection
+   */
+  private void startWebApp() {
+    Configuration conf = getConfig();
+    // use the same ports as the old ATS for now; we could create new 
properties
+    // for the new timeline service if needed
+    String bindAddress = WebAppUtils.getWebAppBindURL(conf,
+        YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+        WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
+    LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
+    try {
+      Configuration confForInfoServer = new Configuration(conf);
+      confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
+      HttpServer2.Builder builder = new HttpServer2.Builder()
+          .setName("timeline")
+          .setConf(conf)
+          .addEndpoint(URI.create("http://"; + bindAddress));
+      timelineRestServer = builder.build();
+      // TODO: replace this by an authentication filter in future.
+      HashMap<String, String> options = new HashMap<>();
+      String username = conf.get(HADOOP_HTTP_STATIC_USER,
+          DEFAULT_HADOOP_HTTP_STATIC_USER);
+      options.put(HADOOP_HTTP_STATIC_USER, username);
+      HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
+          "static_user_filter_timeline",
+          StaticUserWebFilter.StaticUserFilter.class.getName(),
+          options, new String[] {"/*"});
+
+      timelineRestServer.addJerseyResourcePackage(
+          TimelineAggregatorWebService.class.getPackage().getName() + ";"
+              + GenericExceptionHandler.class.getPackage().getName() + ";"
+              + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
+          "/*");
+      timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
+          TimelineAggregatorsCollection.getInstance());
+      timelineRestServer.start();
+    } catch (Exception e) {
+      String msg = "The per-node aggregator webapp failed to start.";
+      LOG.error(msg, e);
+      throw new YarnRuntimeException(msg, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
deleted file mode 100644
index c0af8c5..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-
-public class TestAppLevelAggregatorService {
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
deleted file mode 100644
index 3f981c7..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-public class TestAppLevelServiceManager {
-
-  @Test(timeout=60000)
-  public void testMultithreadedAdd() throws Exception {
-    final AppLevelServiceManager serviceManager =
-        spy(new AppLevelServiceManager());
-    doReturn(new Configuration()).when(serviceManager).getConfig();
-
-    final int NUM_APPS = 5;
-    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
-    for (int i = 0; i < NUM_APPS; i++) {
-      final String appId = String.valueOf(i);
-      Callable<Boolean> task = new Callable<Boolean>() {
-        public Boolean call() {
-          return serviceManager.addService(appId);
-        }
-      };
-      tasks.add(task);
-    }
-    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
-    try {
-      List<Future<Boolean>> futures = executor.invokeAll(tasks);
-      for (Future<Boolean> future: futures) {
-        assertTrue(future.get());
-      }
-    } finally {
-      executor.shutdownNow();
-    }
-    // check the keys
-    for (int i = 0; i < NUM_APPS; i++) {
-      assertTrue(serviceManager.hasService(String.valueOf(i)));
-    }
-  }
-
-  @Test
-  public void testMultithreadedAddAndRemove() throws Exception {
-    final AppLevelServiceManager serviceManager =
-        spy(new AppLevelServiceManager());
-    doReturn(new Configuration()).when(serviceManager).getConfig();
-
-    final int NUM_APPS = 5;
-    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
-    for (int i = 0; i < NUM_APPS; i++) {
-      final String appId = String.valueOf(i);
-      Callable<Boolean> task = new Callable<Boolean>() {
-        public Boolean call() {
-          return serviceManager.addService(appId) &&
-              serviceManager.removeService(appId);
-        }
-      };
-      tasks.add(task);
-    }
-    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
-    try {
-      List<Future<Boolean>> futures = executor.invokeAll(tasks);
-      for (Future<Boolean> future: futures) {
-        assertTrue(future.get());
-      }
-    } finally {
-      executor.shutdownNow();
-    }
-    // check the keys
-    for (int i = 0; i < NUM_APPS; i++) {
-      assertFalse(serviceManager.hasService(String.valueOf(i)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
new file mode 100644
index 0000000..8f95814
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+
+public class TestAppLevelTimelineAggregator {
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
deleted file mode 100644
index 55953cd..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-public class TestBaseAggregatorService {
-
-}

Reply via email to