YARN-3210. Refactored timeline aggregator according to new code organization proposed in YARN-3166. Contributed by Li Lu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3ff7f06 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3ff7f06 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3ff7f06 Branch: refs/heads/YARN-2928 Commit: d3ff7f06cbc66d3a23c2551e7d4c752689f46afe Parents: e4d81eb Author: Zhijie Shen <zjs...@apache.org> Authored: Tue Mar 3 11:21:03 2015 -0800 Committer: Zhijie Shen <zjs...@apache.org> Committed: Tue Mar 3 11:25:17 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../distributedshell/TestDistributedShell.java | 4 +- .../hadoop-yarn-server-nodemanager/pom.xml | 5 - .../server/nodemanager/webapp/WebServer.java | 3 - .../TestTimelineServiceClientIntegration.java | 12 +- .../aggregator/AppLevelAggregatorService.java | 57 ---- .../aggregator/AppLevelServiceManager.java | 136 ---------- .../AppLevelServiceManagerProvider.java | 33 --- .../aggregator/AppLevelTimelineAggregator.java | 57 ++++ .../aggregator/BaseAggregatorService.java | 107 -------- .../aggregator/PerNodeAggregatorServer.java | 268 ------------------- .../aggregator/PerNodeAggregatorWebService.java | 180 ------------- .../PerNodeTimelineAggregatorsAuxService.java | 212 +++++++++++++++ .../aggregator/TimelineAggregator.java | 107 ++++++++ .../TimelineAggregatorWebService.java | 180 +++++++++++++ .../TimelineAggregatorsCollection.java | 203 ++++++++++++++ .../TestAppLevelAggregatorService.java | 23 -- .../aggregator/TestAppLevelServiceManager.java | 102 ------- .../TestAppLevelTimelineAggregator.java | 23 ++ .../aggregator/TestBaseAggregatorService.java | 23 -- .../aggregator/TestPerNodeAggregatorServer.java | 149 ----------- ...estPerNodeTimelineAggregatorsAuxService.java | 150 +++++++++++ .../aggregator/TestTimelineAggregator.java | 23 ++ .../TestTimelineAggregatorsCollection.java | 108 ++++++++ 24 files changed, 1074 insertions(+), 1094 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b13475a..0548460 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -23,6 +23,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3125. Made the distributed shell use timeline service next gen and add an integration test for it. (Junping Du and Li Lu via zjshen) + YARN-3210. Refactored timeline aggregator according to new code + organization proposed in YARN-3166. (Li Lu via zjshen) + IMPROVEMENTS OPTIMIZATIONS http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 71466cb..313dc97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer; +import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -96,7 +96,7 @@ public class TestDistributedShell { // enable aux-service based timeline aggregators conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME - + ".class", PerNodeAggregatorServer.class.getName()); + + ".class", PerNodeTimelineAggregatorsAuxService.class.getName()); } conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index 26a33b4..b1efa5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -53,11 +53,6 @@ <artifactId>hadoop-yarn-api</artifactId> </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-timelineservice</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java index 77deaed..fdff480 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java @@ -29,9 +29,6 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManager; -import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManagerProvider; -import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorWebService; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index a5159a2..32ee5d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -6,7 +6,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer; +import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -14,13 +14,13 @@ import org.junit.Test; import static org.junit.Assert.fail; public class TestTimelineServiceClientIntegration { - private static PerNodeAggregatorServer server; + private static PerNodeTimelineAggregatorsAuxService auxService; @BeforeClass public static void setupClass() throws Exception { try { - server = PerNodeAggregatorServer.launchServer(new String[0]); - server.addApplication(ApplicationId.newInstance(0, 1)); + auxService = PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]); + auxService.addApplication(ApplicationId.newInstance(0, 1)); } catch (ExitUtil.ExitException e) { fail(); } @@ -28,8 +28,8 @@ public class TestTimelineServiceClientIntegration { @AfterClass public static void tearDownClass() throws Exception { - if (server != null) { - server.stop(); + if (auxService != null) { + auxService.stop(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java deleted file mode 100644 index bf72fb9..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; - -/** - * Service that handles writes to the timeline service and writes them to the - * backing storage for a given YARN application. - * - * App-related lifecycle management is handled by this service. - */ -@Private -@Unstable -public class AppLevelAggregatorService extends BaseAggregatorService { - private final String applicationId; - // TODO define key metadata such as flow metadata, user, and queue - - public AppLevelAggregatorService(String applicationId) { - super(AppLevelAggregatorService.class.getName() + " - " + applicationId); - this.applicationId = applicationId; - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java deleted file mode 100644 index 05d321f..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; - -/** - * Class that manages adding and removing app level aggregator services and - * their lifecycle. It provides thread safety access to the app level services. - * - * It is a singleton, and instances should be obtained via - * {@link #getInstance()}. - */ -@Private -@Unstable -public class AppLevelServiceManager extends CompositeService { - private static final Log LOG = - LogFactory.getLog(AppLevelServiceManager.class); - private static final AppLevelServiceManager INSTANCE = - new AppLevelServiceManager(); - - // access to this map is synchronized with the map itself - private final Map<String,AppLevelAggregatorService> services = - Collections.synchronizedMap( - new HashMap<String,AppLevelAggregatorService>()); - - static AppLevelServiceManager getInstance() { - return INSTANCE; - } - - AppLevelServiceManager() { - super(AppLevelServiceManager.class.getName()); - } - - /** - * Creates and adds an app level aggregator service for the specified - * application id. The service is also initialized and started. If the service - * already exists, no new service is created. - * - * @throws YarnRuntimeException if there was any exception in initializing and - * starting the app level service - * @return whether it was added successfully - */ - public boolean addService(String appId) { - synchronized (services) { - AppLevelAggregatorService service = services.get(appId); - if (service == null) { - try { - service = new AppLevelAggregatorService(appId); - // initialize, start, and add it to the parent service so it can be - // cleaned up when the parent shuts down - service.init(getConfig()); - service.start(); - services.put(appId, service); - LOG.info("the application aggregator service for " + appId + - " was added"); - return true; - } catch (Exception e) { - throw new YarnRuntimeException(e); - } - } else { - String msg = "the application aggregator service for " + appId + - " already exists!"; - LOG.error(msg); - return false; - } - } - } - - /** - * Removes the app level aggregator service for the specified application id. - * The service is also stopped as a result. If the service does not exist, no - * change is made. - * - * @return whether it was removed successfully - */ - public boolean removeService(String appId) { - synchronized (services) { - AppLevelAggregatorService service = services.remove(appId); - if (service == null) { - String msg = "the application aggregator service for " + appId + - " does not exist!"; - LOG.error(msg); - return false; - } else { - // stop the service to do clean up - service.stop(); - LOG.info("the application aggregator service for " + appId + - " was removed"); - return true; - } - } - } - - /** - * Returns the app level aggregator service for the specified application id. - * - * @return the app level aggregator service or null if it does not exist - */ - public AppLevelAggregatorService getService(String appId) { - return services.get(appId); - } - - /** - * Returns whether the app level aggregator service for the specified - * application id exists. - */ - public boolean hasService(String appId) { - return services.containsKey(appId); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java deleted file mode 100644 index 8768575..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import com.google.inject.Provider; - -/** - * A guice provider that provides a global singleton instance of - * AppLevelServiceManager. - */ -public class AppLevelServiceManagerProvider - implements Provider<AppLevelServiceManager> { - @Override - public AppLevelServiceManager get() { - return AppLevelServiceManager.getInstance(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java new file mode 100644 index 0000000..95ec9f8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; + +/** + * Service that handles writes to the timeline service and writes them to the + * backing storage for a given YARN application. + * + * App-related lifecycle management is handled by this service. + */ +@Private +@Unstable +public class AppLevelTimelineAggregator extends TimelineAggregator { + private final String applicationId; + // TODO define key metadata such as flow metadata, user, and queue + + public AppLevelTimelineAggregator(String applicationId) { + super(AppLevelTimelineAggregator.class.getName() + " - " + applicationId); + this.applicationId = applicationId; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java deleted file mode 100644 index e362139..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; - -/** - * Service that handles writes to the timeline service and writes them to the - * backing storage. - * - * Classes that extend this can add their own lifecycle management or - * customization of request handling. - */ -@Private -@Unstable -public class BaseAggregatorService extends CompositeService { - private static final Log LOG = LogFactory.getLog(BaseAggregatorService.class); - - public BaseAggregatorService(String name) { - super(name); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - } - - /** - * Handles entity writes. These writes are synchronous and are written to the - * backing storage without buffering/batching. If any entity already exists, - * it results in an update of the entity. - * - * This method should be reserved for selected critical entities and events. - * For normal voluminous writes one should use the async method - * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}. - * - * @param entities entities to post - * @param callerUgi the caller UGI - */ - public void postEntities(TimelineEntities entities, - UserGroupInformation callerUgi) { - // Add this output temporarily for our prototype - // TODO remove this after we have an actual implementation - LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE"); - LOG.info("postEntities(entities=" + entities + ", callerUgi=" + - callerUgi + ")"); - - // TODO implement - if (LOG.isDebugEnabled()) { - LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + - callerUgi + ")"); - } - } - - /** - * Handles entity writes in an asynchronous manner. The method returns as soon - * as validation is done. No promises are made on how quickly it will be - * written to the backing storage or if it will always be written to the - * backing storage. Multiple writes to the same entities may be batched and - * appropriate values updated and result in fewer writes to the backing - * storage. - * - * @param entities entities to post - * @param callerUgi the caller UGI - */ - public void postEntitiesAsync(TimelineEntities entities, - UserGroupInformation callerUgi) { - // TODO implement - if (LOG.isDebugEnabled()) { - LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" + - callerUgi + ")"); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java deleted file mode 100644 index deb21c7..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java +++ /dev/null @@ -1,268 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.HashMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.http.lib.StaticUserWebFilter; -import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; -import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; -import org.apache.hadoop.yarn.server.api.AuxiliaryService; -import org.apache.hadoop.yarn.server.api.ContainerContext; -import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; -import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; -import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.apache.hadoop.yarn.webapp.util.WebAppUtils; -import org.apache.hadoop.http.HttpServer2; - -import com.google.common.annotations.VisibleForTesting; - -import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; -import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; - -/** - * The top-level server for the per-node timeline aggregator service. Currently - * it is defined as an auxiliary service to accommodate running within another - * daemon (e.g. node manager). - */ -@Private -@Unstable -public class PerNodeAggregatorServer extends AuxiliaryService { - private static final Log LOG = - LogFactory.getLog(PerNodeAggregatorServer.class); - private static final int SHUTDOWN_HOOK_PRIORITY = 30; - static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection"; - - private final AppLevelServiceManager serviceManager; - private HttpServer2 timelineRestServer; - - public PerNodeAggregatorServer() { - // use the same singleton - this(AppLevelServiceManager.getInstance()); - } - - @VisibleForTesting - PerNodeAggregatorServer(AppLevelServiceManager serviceManager) { - super("timeline_aggregator"); - this.serviceManager = serviceManager; - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - serviceManager.init(conf); - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - serviceManager.start(); - startWebApp(); - } - - @Override - protected void serviceStop() throws Exception { - if (timelineRestServer != null) { - timelineRestServer.stop(); - } - // stop the service manager - serviceManager.stop(); - super.serviceStop(); - } - - private void startWebApp() { - Configuration conf = getConfig(); - // use the same ports as the old ATS for now; we could create new properties - // for the new timeline service if needed - String bindAddress = WebAppUtils.getWebAppBindURL(conf, - YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, - WebAppUtils.getAHSWebAppURLWithoutScheme(conf)); - LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress); - try { - Configuration confForInfoServer = new Configuration(conf); - confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); - HttpServer2.Builder builder = new HttpServer2.Builder() - .setName("timeline") - .setConf(conf) - .addEndpoint(URI.create("http://" + bindAddress)); - timelineRestServer = builder.build(); - // TODO: replace this by an authentication filter in future. - HashMap<String, String> options = new HashMap<>(); - String username = conf.get(HADOOP_HTTP_STATIC_USER, - DEFAULT_HADOOP_HTTP_STATIC_USER); - options.put(HADOOP_HTTP_STATIC_USER, username); - HttpServer2.defineFilter(timelineRestServer.getWebAppContext(), - "static_user_filter_timeline", - StaticUserWebFilter.StaticUserFilter.class.getName(), - options, new String[] {"/*"}); - - timelineRestServer.addJerseyResourcePackage( - PerNodeAggregatorWebService.class.getPackage().getName() + ";" - + GenericExceptionHandler.class.getPackage().getName() + ";" - + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), - "/*"); - timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY, - AppLevelServiceManager.getInstance()); - timelineRestServer.start(); - } catch (Exception e) { - String msg = "The per-node aggregator webapp failed to start."; - LOG.error(msg, e); - throw new YarnRuntimeException(msg, e); - } - } - - // these methods can be used as the basis for future service methods if the - // per-node aggregator runs separate from the node manager - /** - * Creates and adds an app level aggregator service for the specified - * application id. The service is also initialized and started. If the service - * already exists, no new service is created. - * - * @return whether it was added successfully - */ - public boolean addApplication(ApplicationId appId) { - String appIdString = appId.toString(); - return serviceManager.addService(appIdString); - } - - /** - * Removes the app level aggregator service for the specified application id. - * The service is also stopped as a result. If the service does not exist, no - * change is made. - * - * @return whether it was removed successfully - */ - public boolean removeApplication(ApplicationId appId) { - String appIdString = appId.toString(); - return serviceManager.removeService(appIdString); - } - - /** - * Creates and adds an app level aggregator service for the specified - * application id. The service is also initialized and started. If the service - * already exists, no new service is created. - */ - @Override - public void initializeContainer(ContainerInitializationContext context) { - // intercept the event of the AM container being created and initialize the - // app level aggregator service - if (isApplicationMaster(context)) { - ApplicationId appId = context.getContainerId(). - getApplicationAttemptId().getApplicationId(); - addApplication(appId); - } - } - - /** - * Removes the app level aggregator service for the specified application id. - * The service is also stopped as a result. If the service does not exist, no - * change is made. - */ - @Override - public void stopContainer(ContainerTerminationContext context) { - // intercept the event of the AM container being stopped and remove the app - // level aggregator service - if (isApplicationMaster(context)) { - ApplicationId appId = context.getContainerId(). - getApplicationAttemptId().getApplicationId(); - removeApplication(appId); - } - } - - private boolean isApplicationMaster(ContainerContext context) { - // TODO this is based on a (shaky) assumption that the container id (the - // last field of the full container id) for an AM is always 1 - // we want to make this much more reliable - ContainerId containerId = context.getContainerId(); - return containerId.getContainerId() == 1L; - } - - @VisibleForTesting - boolean hasApplication(String appId) { - return serviceManager.hasService(appId); - } - - @Override - public void initializeApplication(ApplicationInitializationContext context) { - } - - @Override - public void stopApplication(ApplicationTerminationContext context) { - } - - @Override - public ByteBuffer getMetaData() { - // TODO currently it is not used; we can return a more meaningful data when - // we connect it with an AM - return ByteBuffer.allocate(0); - } - - @VisibleForTesting - public static PerNodeAggregatorServer launchServer(String[] args) { - Thread - .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); - StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args, - LOG); - PerNodeAggregatorServer server = null; - try { - server = new PerNodeAggregatorServer(); - ShutdownHookManager.get().addShutdownHook(new ShutdownHook(server), - SHUTDOWN_HOOK_PRIORITY); - YarnConfiguration conf = new YarnConfiguration(); - server.init(conf); - server.start(); - } catch (Throwable t) { - LOG.fatal("Error starting PerNodeAggregatorServer", t); - ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer"); - } - return server; - } - - private static class ShutdownHook implements Runnable { - private final PerNodeAggregatorServer server; - - public ShutdownHook(PerNodeAggregatorServer server) { - this.server = server; - } - - public void run() { - server.stop(); - } - } - - public static void main(String[] args) { - launchServer(args); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java deleted file mode 100644 index ffe099e..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import javax.servlet.ServletContext; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.*; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlRootElement; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.webapp.ForbiddenException; -import org.apache.hadoop.yarn.webapp.NotFoundException; - -import com.google.inject.Singleton; - -/** - * The main per-node REST end point for timeline service writes. It is - * essentially a container service that routes requests to the appropriate - * per-app services. - */ -@Private -@Unstable -@Singleton -@Path("/ws/v2/timeline") -public class PerNodeAggregatorWebService { - private static final Log LOG = - LogFactory.getLog(PerNodeAggregatorWebService.class); - - private @Context ServletContext context; - - @XmlRootElement(name = "about") - @XmlAccessorType(XmlAccessType.NONE) - @Public - @Unstable - public static class AboutInfo { - - private String about; - - public AboutInfo() { - - } - - public AboutInfo(String about) { - this.about = about; - } - - @XmlElement(name = "About") - public String getAbout() { - return about; - } - - public void setAbout(String about) { - this.about = about; - } - - } - - /** - * Return the description of the timeline web services. - */ - @GET - @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public AboutInfo about( - @Context HttpServletRequest req, - @Context HttpServletResponse res) { - init(res); - return new AboutInfo("Timeline API"); - } - - /** - * Accepts writes to the aggregator, and returns a response. It simply routes - * the request to the app level aggregator. It expects an application as a - * context. - */ - @PUT - @Path("/entities") - @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public Response putEntities( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - @QueryParam("async") String async, - @QueryParam("appid") String appId, - TimelineEntities entities) { - init(res); - UserGroupInformation callerUgi = getUser(req); - if (callerUgi == null) { - String msg = "The owner of the posted timeline entities is not set"; - LOG.error(msg); - throw new ForbiddenException(msg); - } - - // TODO how to express async posts and handle them - boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); - - try { - appId = parseApplicationId(appId); - if (appId == null) { - return Response.status(Response.Status.BAD_REQUEST).build(); - } - AppLevelAggregatorService service = getAggregatorService(req, appId); - if (service == null) { - LOG.error("Application not found"); - throw new NotFoundException(); // different exception? - } - service.postEntities(entities, callerUgi); - return Response.ok().build(); - } catch (Exception e) { - LOG.error("Error putting entities", e); - throw new WebApplicationException(e, - Response.Status.INTERNAL_SERVER_ERROR); - } - } - - private String parseApplicationId(String appId) { - // Make sure the appId is not null and is valid - ApplicationId appID; - try { - if (appId != null) { - return ConverterUtils.toApplicationId(appId.trim()).toString(); - } else { - return null; - } - } catch (Exception e) { - return null; - } - } - - private AppLevelAggregatorService - getAggregatorService(HttpServletRequest req, String appIdToParse) { - String appIdString = parseApplicationId(appIdToParse); - final AppLevelServiceManager serviceManager = - (AppLevelServiceManager) context.getAttribute( - PerNodeAggregatorServer.AGGREGATOR_COLLECTION_ATTR_KEY); - return serviceManager.getService(appIdString); - } - - private void init(HttpServletResponse response) { - response.setContentType(null); - } - - private UserGroupInformation getUser(HttpServletRequest req) { - String remoteUser = req.getRemoteUser(); - UserGroupInformation callerUgi = null; - if (remoteUser != null) { - callerUgi = UserGroupInformation.createRemoteUser(remoteUser); - } - return callerUgi; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java new file mode 100644 index 0000000..cdc4e35 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ContainerContext; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The top-level server for the per-node timeline aggregator collection. Currently + * it is defined as an auxiliary service to accommodate running within another + * daemon (e.g. node manager). + */ +@Private +@Unstable +public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService { + private static final Log LOG = + LogFactory.getLog(PerNodeTimelineAggregatorsAuxService.class); + private static final int SHUTDOWN_HOOK_PRIORITY = 30; + + private final TimelineAggregatorsCollection aggregatorCollection; + + public PerNodeTimelineAggregatorsAuxService() { + // use the same singleton + this(TimelineAggregatorsCollection.getInstance()); + } + + @VisibleForTesting PerNodeTimelineAggregatorsAuxService( + TimelineAggregatorsCollection aggregatorCollection) { + super("timeline_aggregator"); + this.aggregatorCollection = aggregatorCollection; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + aggregatorCollection.init(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + aggregatorCollection.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + aggregatorCollection.stop(); + super.serviceStop(); + } + + // these methods can be used as the basis for future service methods if the + // per-node aggregator runs separate from the node manager + /** + * Creates and adds an app level aggregator for the specified application id. + * The aggregator is also initialized and started. If the service already + * exists, no new service is created. + * + * @return whether it was added successfully + */ + public boolean addApplication(ApplicationId appId) { + String appIdString = appId.toString(); + AppLevelTimelineAggregator aggregator = + new AppLevelTimelineAggregator(appIdString); + return (aggregatorCollection.putIfAbsent(appIdString, aggregator) + == aggregator); + } + + /** + * Removes the app level aggregator for the specified application id. The + * aggregator is also stopped as a result. If the aggregator does not exist, no + * change is made. + * + * @return whether it was removed successfully + */ + public boolean removeApplication(ApplicationId appId) { + String appIdString = appId.toString(); + return aggregatorCollection.remove(appIdString); + } + + /** + * Creates and adds an app level aggregator for the specified application id. + * The aggregator is also initialized and started. If the aggregator already + * exists, no new aggregator is created. + */ + @Override + public void initializeContainer(ContainerInitializationContext context) { + // intercept the event of the AM container being created and initialize the + // app level aggregator service + if (isApplicationMaster(context)) { + ApplicationId appId = context.getContainerId(). + getApplicationAttemptId().getApplicationId(); + addApplication(appId); + } + } + + /** + * Removes the app level aggregator for the specified application id. The + * aggregator is also stopped as a result. If the aggregator does not exist, no + * change is made. + */ + @Override + public void stopContainer(ContainerTerminationContext context) { + // intercept the event of the AM container being stopped and remove the app + // level aggregator service + if (isApplicationMaster(context)) { + ApplicationId appId = context.getContainerId(). + getApplicationAttemptId().getApplicationId(); + removeApplication(appId); + } + } + + private boolean isApplicationMaster(ContainerContext context) { + // TODO this is based on a (shaky) assumption that the container id (the + // last field of the full container id) for an AM is always 1 + // we want to make this much more reliable + ContainerId containerId = context.getContainerId(); + return containerId.getContainerId() == 1L; + } + + @VisibleForTesting + boolean hasApplication(String appId) { + return aggregatorCollection.containsKey(appId); + } + + @Override + public void initializeApplication(ApplicationInitializationContext context) { + } + + @Override + public void stopApplication(ApplicationTerminationContext context) { + } + + @Override + public ByteBuffer getMetaData() { + // TODO currently it is not used; we can return a more meaningful data when + // we connect it with an AM + return ByteBuffer.allocate(0); + } + + @VisibleForTesting + public static PerNodeTimelineAggregatorsAuxService launchServer(String[] args) { + Thread + .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(PerNodeTimelineAggregatorsAuxService.class, args, + LOG); + PerNodeTimelineAggregatorsAuxService auxService = null; + try { + auxService = new PerNodeTimelineAggregatorsAuxService(); + ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService), + SHUTDOWN_HOOK_PRIORITY); + YarnConfiguration conf = new YarnConfiguration(); + auxService.init(conf); + auxService.start(); + } catch (Throwable t) { + LOG.fatal("Error starting PerNodeAggregatorServer", t); + ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer"); + } + return auxService; + } + + private static class ShutdownHook implements Runnable { + private final PerNodeTimelineAggregatorsAuxService auxService; + + public ShutdownHook(PerNodeTimelineAggregatorsAuxService auxService) { + this.auxService = auxService; + } + + public void run() { + auxService.stop(); + } + } + + public static void main(String[] args) { + launchServer(args); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java new file mode 100644 index 0000000..4227712 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; + +/** + * Service that handles writes to the timeline service and writes them to the + * backing storage. + * + * Classes that extend this can putIfAbsent their own lifecycle management or + * customization of request handling. + */ +@Private +@Unstable +public abstract class TimelineAggregator extends CompositeService { + private static final Log LOG = LogFactory.getLog(TimelineAggregator.class); + + public TimelineAggregator(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + /** + * Handles entity writes. These writes are synchronous and are written to the + * backing storage without buffering/batching. If any entity already exists, + * it results in an update of the entity. + * + * This method should be reserved for selected critical entities and events. + * For normal voluminous writes one should use the async method + * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * + * @param entities entities to post + * @param callerUgi the caller UGI + */ + public void postEntities(TimelineEntities entities, + UserGroupInformation callerUgi) { + // Add this output temporarily for our prototype + // TODO remove this after we have an actual implementation + LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE"); + LOG.info("postEntities(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + + // TODO implement + if (LOG.isDebugEnabled()) { + LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + } + } + + /** + * Handles entity writes in an asynchronous manner. The method returns as soon + * as validation is done. No promises are made on how quickly it will be + * written to the backing storage or if it will always be written to the + * backing storage. Multiple writes to the same entities may be batched and + * appropriate values updated and result in fewer writes to the backing + * storage. + * + * @param entities entities to post + * @param callerUgi the caller UGI + */ + public void postEntitiesAsync(TimelineEntities entities, + UserGroupInformation callerUgi) { + // TODO implement + if (LOG.isDebugEnabled()) { + LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java new file mode 100644 index 0000000..7d42f94 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.*; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.webapp.ForbiddenException; +import org.apache.hadoop.yarn.webapp.NotFoundException; + +import com.google.inject.Singleton; + +/** + * The main per-node REST end point for timeline service writes. It is + * essentially a container service that routes requests to the appropriate + * per-app services. + */ +@Private +@Unstable +@Singleton +@Path("/ws/v2/timeline") +public class TimelineAggregatorWebService { + private static final Log LOG = + LogFactory.getLog(TimelineAggregatorWebService.class); + + private @Context ServletContext context; + + @XmlRootElement(name = "about") + @XmlAccessorType(XmlAccessType.NONE) + @Public + @Unstable + public static class AboutInfo { + + private String about; + + public AboutInfo() { + + } + + public AboutInfo(String about) { + this.about = about; + } + + @XmlElement(name = "About") + public String getAbout() { + return about; + } + + public void setAbout(String about) { + this.about = about; + } + + } + + /** + * Return the description of the timeline web services. + */ + @GET + @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public AboutInfo about( + @Context HttpServletRequest req, + @Context HttpServletResponse res) { + init(res); + return new AboutInfo("Timeline API"); + } + + /** + * Accepts writes to the aggregator, and returns a response. It simply routes + * the request to the app level aggregator. It expects an application as a + * context. + */ + @PUT + @Path("/entities") + @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public Response putEntities( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, + TimelineEntities entities) { + init(res); + UserGroupInformation callerUgi = getUser(req); + if (callerUgi == null) { + String msg = "The owner of the posted timeline entities is not set"; + LOG.error(msg); + throw new ForbiddenException(msg); + } + + // TODO how to express async posts and handle them + boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); + + try { + appId = parseApplicationId(appId); + if (appId == null) { + return Response.status(Response.Status.BAD_REQUEST).build(); + } + TimelineAggregator service = getAggregatorService(req, appId); + if (service == null) { + LOG.error("Application not found"); + throw new NotFoundException(); // different exception? + } + service.postEntities(entities, callerUgi); + return Response.ok().build(); + } catch (Exception e) { + LOG.error("Error putting entities", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); + } + } + + private String parseApplicationId(String appId) { + // Make sure the appId is not null and is valid + ApplicationId appID; + try { + if (appId != null) { + return ConverterUtils.toApplicationId(appId.trim()).toString(); + } else { + return null; + } + } catch (Exception e) { + return null; + } + } + + private TimelineAggregator + getAggregatorService(HttpServletRequest req, String appIdToParse) { + String appIdString = parseApplicationId(appIdToParse); + final TimelineAggregatorsCollection aggregatorCollection = + (TimelineAggregatorsCollection) context.getAttribute( + TimelineAggregatorsCollection.AGGREGATOR_COLLECTION_ATTR_KEY); + return aggregatorCollection.get(appIdString); + } + + private void init(HttpServletResponse response) { + response.setContentType(null); + } + + private UserGroupInformation getUser(HttpServletRequest req) { + String remoteUser = req.getRemoteUser(); + UserGroupInformation callerUgi = null; + if (remoteUser != null) { + callerUgi = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUgi; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java new file mode 100644 index 0000000..73b6d52 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.http.lib.StaticUserWebFilter; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; + +/** + * Class that manages adding and removing aggregators and their lifecycle. It + * provides thread safety access to the aggregators inside. + * + * It is a singleton, and instances should be obtained via + * {@link #getInstance()}. + */ +@Private +@Unstable +public class TimelineAggregatorsCollection extends CompositeService { + private static final Log LOG = + LogFactory.getLog(TimelineAggregatorsCollection.class); + private static final TimelineAggregatorsCollection INSTANCE = + new TimelineAggregatorsCollection(); + + // access to this map is synchronized with the map itself + private final Map<String, TimelineAggregator> aggregators = + Collections.synchronizedMap( + new HashMap<String, TimelineAggregator>()); + + // REST server for this aggregator collection + private HttpServer2 timelineRestServer; + + static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection"; + + static TimelineAggregatorsCollection getInstance() { + return INSTANCE; + } + + TimelineAggregatorsCollection() { + super(TimelineAggregatorsCollection.class.getName()); + } + + @Override + protected void serviceStart() throws Exception { + startWebApp(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (timelineRestServer != null) { + timelineRestServer.stop(); + } + super.serviceStop(); + } + + /** + * Put the aggregator into the collection if an aggregator mapped by id does + * not exist. + * + * @throws YarnRuntimeException if there was any exception in initializing and + * starting the app level service + * @return the aggregator associated with id after the potential put. + */ + public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) { + synchronized (aggregators) { + TimelineAggregator aggregatorInTable = aggregators.get(id); + if (aggregatorInTable == null) { + try { + // initialize, start, and add it to the collection so it can be + // cleaned up when the parent shuts down + aggregator.init(getConfig()); + aggregator.start(); + aggregators.put(id, aggregator); + LOG.info("the aggregator for " + id + " was added"); + return aggregator; + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } else { + String msg = "the aggregator for " + id + " already exists!"; + LOG.error(msg); + return aggregatorInTable; + } + } + } + + /** + * Removes the aggregator for the specified id. The aggregator is also stopped + * as a result. If the aggregator does not exist, no change is made. + * + * @return whether it was removed successfully + */ + public boolean remove(String id) { + synchronized (aggregators) { + TimelineAggregator aggregator = aggregators.remove(id); + if (aggregator == null) { + String msg = "the aggregator for " + id + " does not exist!"; + LOG.error(msg); + return false; + } else { + // stop the service to do clean up + aggregator.stop(); + LOG.info("the aggregator service for " + id + " was removed"); + return true; + } + } + } + + /** + * Returns the aggregator for the specified id. + * + * @return the aggregator or null if it does not exist + */ + public TimelineAggregator get(String id) { + return aggregators.get(id); + } + + /** + * Returns whether the aggregator for the specified id exists in this + * collection. + */ + public boolean containsKey(String id) { + return aggregators.containsKey(id); + } + + /** + * Launch the REST web server for this aggregator collection + */ + private void startWebApp() { + Configuration conf = getConfig(); + // use the same ports as the old ATS for now; we could create new properties + // for the new timeline service if needed + String bindAddress = WebAppUtils.getWebAppBindURL(conf, + YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, + WebAppUtils.getAHSWebAppURLWithoutScheme(conf)); + LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress); + try { + Configuration confForInfoServer = new Configuration(conf); + confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); + HttpServer2.Builder builder = new HttpServer2.Builder() + .setName("timeline") + .setConf(conf) + .addEndpoint(URI.create("http://" + bindAddress)); + timelineRestServer = builder.build(); + // TODO: replace this by an authentication filter in future. + HashMap<String, String> options = new HashMap<>(); + String username = conf.get(HADOOP_HTTP_STATIC_USER, + DEFAULT_HADOOP_HTTP_STATIC_USER); + options.put(HADOOP_HTTP_STATIC_USER, username); + HttpServer2.defineFilter(timelineRestServer.getWebAppContext(), + "static_user_filter_timeline", + StaticUserWebFilter.StaticUserFilter.class.getName(), + options, new String[] {"/*"}); + + timelineRestServer.addJerseyResourcePackage( + TimelineAggregatorWebService.class.getPackage().getName() + ";" + + GenericExceptionHandler.class.getPackage().getName() + ";" + + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), + "/*"); + timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY, + TimelineAggregatorsCollection.getInstance()); + timelineRestServer.start(); + } catch (Exception e) { + String msg = "The per-node aggregator webapp failed to start."; + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java deleted file mode 100644 index c0af8c5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - - -public class TestAppLevelAggregatorService { -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java deleted file mode 100644 index 3f981c7..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -public class TestAppLevelServiceManager { - - @Test(timeout=60000) - public void testMultithreadedAdd() throws Exception { - final AppLevelServiceManager serviceManager = - spy(new AppLevelServiceManager()); - doReturn(new Configuration()).when(serviceManager).getConfig(); - - final int NUM_APPS = 5; - List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(); - for (int i = 0; i < NUM_APPS; i++) { - final String appId = String.valueOf(i); - Callable<Boolean> task = new Callable<Boolean>() { - public Boolean call() { - return serviceManager.addService(appId); - } - }; - tasks.add(task); - } - ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); - try { - List<Future<Boolean>> futures = executor.invokeAll(tasks); - for (Future<Boolean> future: futures) { - assertTrue(future.get()); - } - } finally { - executor.shutdownNow(); - } - // check the keys - for (int i = 0; i < NUM_APPS; i++) { - assertTrue(serviceManager.hasService(String.valueOf(i))); - } - } - - @Test - public void testMultithreadedAddAndRemove() throws Exception { - final AppLevelServiceManager serviceManager = - spy(new AppLevelServiceManager()); - doReturn(new Configuration()).when(serviceManager).getConfig(); - - final int NUM_APPS = 5; - List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(); - for (int i = 0; i < NUM_APPS; i++) { - final String appId = String.valueOf(i); - Callable<Boolean> task = new Callable<Boolean>() { - public Boolean call() { - return serviceManager.addService(appId) && - serviceManager.removeService(appId); - } - }; - tasks.add(task); - } - ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); - try { - List<Future<Boolean>> futures = executor.invokeAll(tasks); - for (Future<Boolean> future: futures) { - assertTrue(future.get()); - } - } finally { - executor.shutdownNow(); - } - // check the keys - for (int i = 0; i < NUM_APPS; i++) { - assertFalse(serviceManager.hasService(String.valueOf(i))); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java new file mode 100644 index 0000000..8f95814 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + + +public class TestAppLevelTimelineAggregator { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java deleted file mode 100644 index 55953cd..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -public class TestBaseAggregatorService { - -}