http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java new file mode 100644 index 0000000..009fa63 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.collectormanager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; + +public class NMCollectorService extends CompositeService implements + CollectorNodemanagerProtocol { + + private static final Log LOG = LogFactory.getLog(NMCollectorService.class); + + final Context context; + + private Server server; + + public NMCollectorService(Context context) { + + super(NMCollectorService.class.getName()); + this.context = context; + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + + InetSocketAddress collectorServerAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT); + + Configuration serverConf = new Configuration(conf); + + // TODO Security settings. + YarnRPC rpc = YarnRPC.create(conf); + + server = + rpc.getServer(CollectorNodemanagerProtocol.class, this, + collectorServerAddress, serverConf, + this.context.getNMTokenSecretManager(), + conf.getInt(YarnConfiguration.NM_COLLECTOR_SERVICE_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT)); + + server.start(); + // start remaining services + super.serviceStart(); + LOG.info("NMCollectorService started at " + collectorServerAddress); + } + + + @Override + public void serviceStop() throws Exception { + if (server != null) { + server.stop(); + } + // TODO may cleanup app collectors running on this NM in future. + super.serviceStop(); + } + + @Override + public ReportNewCollectorInfoResponse reportNewCollectorInfo( + ReportNewCollectorInfoRequest request) throws IOException { + List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList(); + if (newCollectorsList != null && !newCollectorsList.isEmpty()) { + Map<ApplicationId, String> newCollectorsMap = + new HashMap<ApplicationId, String>(); + for (AppCollectorsMap collector : newCollectorsList) { + newCollectorsMap.put(collector.getApplicationId(), collector.getCollectorAddr()); + } + ((NodeManager.NMContext)context).addRegisteredCollectors(newCollectorsMap); + } + + return ReportNewCollectorInfoResponse.newInstance(); + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index ba5033f..cc82401 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -424,10 +424,11 @@ public class ApplicationImpl implements Application { new LogHandlerAppFinishedEvent(app.appId)); app.context.getNMTokenSecretManager().appFinished(app.getAppId()); - // Remove aggregator info for finished apps. - // TODO check we remove related aggregators info in failure cases (YARN-3038) - app.context.getRegisteredAggregators().remove(app.getAppId()); - app.context.getKnownAggregators().remove(app.getAppId()); + // Remove collectors info for finished apps. + // TODO check we remove related collectors info in failure cases + // (YARN-3038) + app.context.getRegisteredCollectors().remove(app.getAppId()); + app.context.getKnownCollectors().remove(app.getAppId()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 30c9200..db7eda2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -351,8 +351,8 @@ public class ApplicationMasterService extends AbstractService implements RMApp rmApp = rmContext.getRMApps().get(applicationAttemptId.getApplicationId()); - // Remove aggregator address when app get finished. - rmApp.removeAggregatorAddr(); + // Remove collector address when app get finished. + rmApp.removeCollectorAddr(); // checking whether the app exits in RMStateStore at first not to throw // ApplicationDoesNotExistInCacheException before and after // RM work-preserving restart. @@ -601,10 +601,10 @@ public class ApplicationMasterService extends AbstractService implements allocateResponse.setAvailableResources(allocation.getResourceLimit()); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); - - // add aggregator address for this application - allocateResponse.setAggregatorAddr( - this.rmContext.getRMApps().get(applicationId).getAggregatorAddr()); + + // add collector address for this application + allocateResponse.setCollectorAddr( + this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); // add preemption to the allocateResponse message (if any) allocateResponse http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 052655f..e98fefc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppAggregatorUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppCollectorUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -445,11 +445,11 @@ public class ResourceTrackerService extends AbstractService implements return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC, message); } - - // Check & update aggregators info from request. + + // Check & update collectors info from request. // TODO make sure it won't have race condition issue for AM failed over case // that the older registration could possible override the newer one. - updateAppAggregatorsMap(request); + updateAppCollectorsMap(request); // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils @@ -465,13 +465,14 @@ public class ResourceTrackerService extends AbstractService implements if (!systemCredentials.isEmpty()) { nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); } - - // Return aggregators' map that NM needs to know - // TODO we should optimize this to only include aggreator info that NM + + // Return collectors' map that NM needs to know + // TODO we should optimize this to only include collector info that NM // doesn't know yet. - List<ApplicationId> keepAliveApps = remoteNodeStatus.getKeepAliveApplications(); + List<ApplicationId> keepAliveApps = + remoteNodeStatus.getKeepAliveApplications(); if (keepAliveApps != null) { - setAppAggregatorsMapToResponse(keepAliveApps, nodeHeartBeatResponse); + setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse); } // 4. Send status to RMNode, saving the latest response. @@ -501,48 +502,49 @@ public class ResourceTrackerService extends AbstractService implements return nodeHeartBeatResponse; } - - private void setAppAggregatorsMapToResponse( + + private void setAppCollectorsMapToResponse( List<ApplicationId> liveApps, NodeHeartbeatResponse response) { - Map<ApplicationId, String> liveAppAggregatorsMap = new + Map<ApplicationId, String> liveAppCollectorsMap = new ConcurrentHashMap<ApplicationId, String>(); Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps(); for (ApplicationId appId : liveApps) { - String appAggregatorAddr = rmApps.get(appId).getAggregatorAddr(); - if (appAggregatorAddr != null) { - liveAppAggregatorsMap.put(appId, appAggregatorAddr); + String appCollectorAddr = rmApps.get(appId).getCollectorAddr(); + if (appCollectorAddr != null) { + liveAppCollectorsMap.put(appId, appCollectorAddr); } else { - // Log a debug info if aggregator address is not found. + // Log a debug info if collector address is not found. if (LOG.isDebugEnabled()) { - LOG.debug("Aggregator for applicaton: " + appId + " hasn't registered yet!"); + LOG.debug("Collector for applicaton: " + appId + + " hasn't registered yet!"); } } } - response.setAppAggregatorsMap(liveAppAggregatorsMap); + response.setAppCollectorsMap(liveAppCollectorsMap); } - - private void updateAppAggregatorsMap(NodeHeartbeatRequest request) { - Map<ApplicationId, String> registeredAggregatorsMap = - request.getRegisteredAggregators(); - if (registeredAggregatorsMap != null - && !registeredAggregatorsMap.isEmpty()) { + + private void updateAppCollectorsMap(NodeHeartbeatRequest request) { + Map<ApplicationId, String> registeredCollectorsMap = + request.getRegisteredCollectors(); + if (registeredCollectorsMap != null + && !registeredCollectorsMap.isEmpty()) { Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps(); - for (Map.Entry<ApplicationId, String> entry: - registeredAggregatorsMap.entrySet()) { + for (Map.Entry<ApplicationId, String> entry: + registeredCollectorsMap.entrySet()) { ApplicationId appId = entry.getKey(); - String aggregatorAddr = entry.getValue(); - if (aggregatorAddr != null && !aggregatorAddr.isEmpty()) { + String collectorAddr = entry.getValue(); + if (collectorAddr != null && !collectorAddr.isEmpty()) { RMApp rmApp = rmApps.get(appId); if (rmApp == null) { - LOG.warn("Cannot update aggregator info because application ID: " + + LOG.warn("Cannot update collector info because application ID: " + appId + " is not found in RMContext!"); } else { - String previousAggregatorAddr = rmApp.getAggregatorAddr(); - if (previousAggregatorAddr == null || - previousAggregatorAddr != aggregatorAddr) { - // sending aggregator update event. - RMAppAggregatorUpdateEvent event = - new RMAppAggregatorUpdateEvent(appId, aggregatorAddr); + String previousCollectorAddr = rmApp.getCollectorAddr(); + if (previousCollectorAddr == null || + previousCollectorAddr != collectorAddr) { + // sending collector update event. + RMAppCollectorUpdateEvent event = + new RMAppCollectorUpdateEvent(appId, collectorAddr); rmContext.getDispatcher().getEventHandler().handle(event); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 8fbedb2..d4b758e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -174,23 +174,23 @@ public interface RMApp extends EventHandler<RMAppEvent> { * @return the tracking url for the application master. */ String getTrackingUrl(); - + /** - * The aggregator address for the application. - * @return the address for the application's aggregator. + * The collector address for the application. + * @return the address for the application's collector. */ - String getAggregatorAddr(); - + String getCollectorAddr(); + /** - * Set aggregator address for the application - * @param aggregatorAddr the address of aggregator + * Set collector address for the application + * @param collectorAddr the address of collector */ - void setAggregatorAddr(String aggregatorAddr); - + void setCollectorAddr(String collectorAddr); + /** - * Remove aggregator address when application is finished or killed. + * Remove collector address when application is finished or killed. */ - void removeAggregatorAddr(); + void removeCollectorAddr(); /** * The original tracking url for the application master. http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java deleted file mode 100644 index b43de44..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.rmapp; - -import org.apache.hadoop.yarn.api.records.ApplicationId; - -public class RMAppAggregatorUpdateEvent extends RMAppEvent { - - private final String appAggregatorAddr; - - public RMAppAggregatorUpdateEvent(ApplicationId appId, String appAggregatorAddr) { - super(appId, RMAppEventType.AGGREGATOR_UPDATE); - this.appAggregatorAddr = appAggregatorAddr; - } - - public String getAppAggregatorAddr(){ - return this.appAggregatorAddr; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java new file mode 100644 index 0000000..698c9b5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public class RMAppCollectorUpdateEvent extends RMAppEvent { + + private final String appCollectorAddr; + + public RMAppCollectorUpdateEvent(ApplicationId appId, + String appCollectorAddr) { + super(appId, RMAppEventType.COLLECTOR_UPDATE); + this.appCollectorAddr = appCollectorAddr; + } + + public String getAppCollectorAddr(){ + return this.appCollectorAddr; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 6e9460a..2b42638 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -30,9 +30,9 @@ public enum RMAppEventType { // Source: Scheduler APP_ACCEPTED, - + // TODO add source later - AGGREGATOR_UPDATE, + COLLECTOR_UPDATE, // Source: RMAppAttempt ATTEMPT_REGISTERED, http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 6536783..c4f5270 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -142,7 +142,7 @@ public class RMAppImpl implements RMApp, Recoverable { private long startTime; private long finishTime = 0; private long storedFinishTime = 0; - private String aggregatorAddr; + private String collectorAddr; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; private String queue; @@ -189,7 +189,7 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.NEW, RMAppState.NEW, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.NEW, RMAppState.NEW, - RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition()) .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, @@ -207,7 +207,7 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, - RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, @@ -227,7 +227,7 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.MOVE, new RMAppMoveTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, - RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, new FinalSavingTransition( @@ -245,7 +245,7 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.MOVE, new RMAppMoveTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, RMAppEventType.ATTEMPT_REGISTERED) .addTransition(RMAppState.ACCEPTED, @@ -273,7 +273,7 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.MOVE, new RMAppMoveTransition()) .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, - RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_UNREGISTERED, new FinalSavingTransition( @@ -304,7 +304,7 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, - RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, @@ -317,7 +317,7 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, - RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE, @@ -330,7 +330,7 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) .addTransition(RMAppState.KILLING, RMAppState.KILLING, - RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( @@ -548,20 +548,20 @@ public class RMAppImpl implements RMApp, Recoverable { public void setQueue(String queue) { this.queue = queue; } - + @Override - public String getAggregatorAddr() { - return this.aggregatorAddr; + public String getCollectorAddr() { + return this.collectorAddr; } - + @Override - public void setAggregatorAddr(String aggregatorAddr) { - this.aggregatorAddr = aggregatorAddr; + public void setCollectorAddr(String collectorAddr) { + this.collectorAddr = collectorAddr; } - + @Override - public void removeAggregatorAddr() { - this.aggregatorAddr = null; + public void removeCollectorAddr() { + this.collectorAddr = null; } @Override @@ -818,8 +818,8 @@ public class RMAppImpl implements RMApp, Recoverable { .getDiagnostics()); this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); - //TODO recover aggregator address. - //this.aggregatorAddr = appState.getAggregatorAddr(); + //TODO recover collector address. + //this.collectorAddr = appState.getCollectorAddr(); for(int i=0; i<appState.getAttemptCount(); ++i) { // create attempt @@ -863,22 +863,22 @@ public class RMAppImpl implements RMApp, Recoverable { }; } - private static final class RMAppAggregatorUpdateTransition + private static final class RMAppCollectorUpdateTransition extends RMAppTransition { - + public void transition(RMAppImpl app, RMAppEvent event) { - LOG.info("Updating aggregator info for app: " + app.getApplicationId()); - - RMAppAggregatorUpdateEvent appAggregatorUpdateEvent = - (RMAppAggregatorUpdateEvent) event; - // Update aggregator address - app.setAggregatorAddr(appAggregatorUpdateEvent.getAppAggregatorAddr()); - + LOG.info("Updating collector info for app: " + app.getApplicationId()); + + RMAppCollectorUpdateEvent appCollectorUpdateEvent = + (RMAppCollectorUpdateEvent) event; + // Update collector address + app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr()); + // TODO persistent to RMStateStore for recover // Save to RMStateStore }; } - + private static final class RMAppNodeUpdateTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event; http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index d9cdee0..e52b054 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -94,15 +94,15 @@ public abstract class MockAsm extends MockApps { throw new UnsupportedOperationException("Not supported yet."); } @Override - public String getAggregatorAddr() { + public String getCollectorAddr() { throw new UnsupportedOperationException("Not supported yet."); } @Override - public void setAggregatorAddr(String aggregatorAddr) { + public void setCollectorAddr(String collectorAddr) { throw new UnsupportedOperationException("Not supported yet."); } @Override - public void removeAggregatorAddr() { + public void removeCollectorAddr() { throw new UnsupportedOperationException("Not supported yet."); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 83e99b8..aec5379 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -285,17 +285,17 @@ public class MockRMApp implements RMApp { } @Override - public String getAggregatorAddr() { + public String getCollectorAddr() { throw new UnsupportedOperationException("Not supported yet."); } - + @Override - public void removeAggregatorAddr() { + public void removeCollectorAddr() { throw new UnsupportedOperationException("Not supported yet."); } @Override - public void setAggregatorAddr(String aggregatorAddr) { + public void setCollectorAddr(String collectorAddr) { throw new UnsupportedOperationException("Not supported yet."); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml index 3b7fba0..e04e858 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml @@ -56,6 +56,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 32ee5d8..fab131c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -1,25 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.yarn.server.timelineservice; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.fail; - public class TestTimelineServiceClientIntegration { - private static PerNodeTimelineAggregatorsAuxService auxService; + private static TimelineCollectorManager collectorManager; + private static PerNodeTimelineCollectorsAuxService auxService; @BeforeClass public static void setupClass() throws Exception { try { - auxService = PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]); + collectorManager = new MyTimelineCollectorManager(); + auxService = + PerNodeTimelineCollectorsAuxService.launchServer(new String[0], + collectorManager); auxService.addApplication(ApplicationId.newInstance(0, 1)); } catch (ExitUtil.ExitException e) { fail(); @@ -38,6 +63,9 @@ public class TestTimelineServiceClientIntegration { TimelineClient client = TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1)); try { + // set the timeline service address manually + client.setTimelineServiceAddress( + collectorManager.getRestServerBindAddress()); client.init(new YarnConfiguration()); client.start(); TimelineEntity entity = new TimelineEntity(); @@ -45,10 +73,20 @@ public class TestTimelineServiceClientIntegration { entity.setId("test entity id"); client.putEntities(entity); client.putEntitiesAsync(entity); - } catch(Exception e) { - fail(); } finally { client.stop(); } } + + private static class MyTimelineCollectorManager extends + TimelineCollectorManager { + public MyTimelineCollectorManager() { + super(); + } + + @Override + protected CollectorNodemanagerProtocol getNMCollectorService() { + return mock(CollectorNodemanagerProtocol.class); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index 26790f1..f974aee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -58,6 +58,11 @@ <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId> </dependency> @@ -72,6 +77,11 @@ </dependency> <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java deleted file mode 100644 index 95ec9f8..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; - -/** - * Service that handles writes to the timeline service and writes them to the - * backing storage for a given YARN application. - * - * App-related lifecycle management is handled by this service. - */ -@Private -@Unstable -public class AppLevelTimelineAggregator extends TimelineAggregator { - private final String applicationId; - // TODO define key metadata such as flow metadata, user, and queue - - public AppLevelTimelineAggregator(String applicationId) { - super(AppLevelTimelineAggregator.class.getName() + " - " + applicationId); - this.applicationId = applicationId; - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java deleted file mode 100644 index 19920fd..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import java.nio.ByteBuffer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; -import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; -import org.apache.hadoop.yarn.server.api.AuxiliaryService; -import org.apache.hadoop.yarn.server.api.ContainerContext; -import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; -import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; - -import com.google.common.annotations.VisibleForTesting; - -/** - * The top-level server for the per-node timeline aggregator collection. Currently - * it is defined as an auxiliary service to accommodate running within another - * daemon (e.g. node manager). - */ -@Private -@Unstable -public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService { - private static final Log LOG = - LogFactory.getLog(PerNodeTimelineAggregatorsAuxService.class); - private static final int SHUTDOWN_HOOK_PRIORITY = 30; - - private final TimelineAggregatorsCollection aggregatorCollection; - - public PerNodeTimelineAggregatorsAuxService() { - // use the same singleton - this(TimelineAggregatorsCollection.getInstance()); - } - - @VisibleForTesting PerNodeTimelineAggregatorsAuxService( - TimelineAggregatorsCollection aggregatorCollection) { - super("timeline_aggregator"); - this.aggregatorCollection = aggregatorCollection; - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - aggregatorCollection.init(conf); - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - aggregatorCollection.start(); - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - aggregatorCollection.stop(); - super.serviceStop(); - } - - // these methods can be used as the basis for future service methods if the - // per-node aggregator runs separate from the node manager - /** - * Creates and adds an app level aggregator for the specified application id. - * The aggregator is also initialized and started. If the service already - * exists, no new service is created. - * - * @return whether it was added successfully - */ - public boolean addApplication(ApplicationId appId) { - AppLevelTimelineAggregator aggregator = - new AppLevelTimelineAggregator(appId.toString()); - return (aggregatorCollection.putIfAbsent(appId, aggregator) - == aggregator); - } - - /** - * Removes the app level aggregator for the specified application id. The - * aggregator is also stopped as a result. If the aggregator does not exist, no - * change is made. - * - * @return whether it was removed successfully - */ - public boolean removeApplication(ApplicationId appId) { - String appIdString = appId.toString(); - return aggregatorCollection.remove(appIdString); - } - - /** - * Creates and adds an app level aggregator for the specified application id. - * The aggregator is also initialized and started. If the aggregator already - * exists, no new aggregator is created. - */ - @Override - public void initializeContainer(ContainerInitializationContext context) { - // intercept the event of the AM container being created and initialize the - // app level aggregator service - if (isApplicationMaster(context)) { - ApplicationId appId = context.getContainerId(). - getApplicationAttemptId().getApplicationId(); - addApplication(appId); - } - } - - /** - * Removes the app level aggregator for the specified application id. The - * aggregator is also stopped as a result. If the aggregator does not exist, no - * change is made. - */ - @Override - public void stopContainer(ContainerTerminationContext context) { - // intercept the event of the AM container being stopped and remove the app - // level aggregator service - if (isApplicationMaster(context)) { - ApplicationId appId = context.getContainerId(). - getApplicationAttemptId().getApplicationId(); - removeApplication(appId); - } - } - - private boolean isApplicationMaster(ContainerContext context) { - // TODO this is based on a (shaky) assumption that the container id (the - // last field of the full container id) for an AM is always 1 - // we want to make this much more reliable - ContainerId containerId = context.getContainerId(); - return containerId.getContainerId() == 1L; - } - - @VisibleForTesting - boolean hasApplication(String appId) { - return aggregatorCollection.containsKey(appId); - } - - @Override - public void initializeApplication(ApplicationInitializationContext context) { - } - - @Override - public void stopApplication(ApplicationTerminationContext context) { - } - - @Override - public ByteBuffer getMetaData() { - // TODO currently it is not used; we can return a more meaningful data when - // we connect it with an AM - return ByteBuffer.allocate(0); - } - - @VisibleForTesting - public static PerNodeTimelineAggregatorsAuxService launchServer(String[] args) { - Thread - .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); - StringUtils.startupShutdownMessage(PerNodeTimelineAggregatorsAuxService.class, args, - LOG); - PerNodeTimelineAggregatorsAuxService auxService = null; - try { - auxService = new PerNodeTimelineAggregatorsAuxService(); - ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService), - SHUTDOWN_HOOK_PRIORITY); - YarnConfiguration conf = new YarnConfiguration(); - auxService.init(conf); - auxService.start(); - } catch (Throwable t) { - LOG.fatal("Error starting PerNodeAggregatorServer", t); - ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer"); - } - return auxService; - } - - private static class ShutdownHook implements Runnable { - private final PerNodeTimelineAggregatorsAuxService auxService; - - public ShutdownHook(PerNodeTimelineAggregatorsAuxService auxService) { - this.auxService = auxService; - } - - public void run() { - auxService.stop(); - } - } - - public static void main(String[] args) { - launchServer(args); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java deleted file mode 100644 index dbd0895..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; -import org.apache.hadoop.util.ReflectionUtils; -/** - * Service that handles writes to the timeline service and writes them to the - * backing storage. - * - * Classes that extend this can add their own lifecycle management or - * customization of request handling. - */ -@Private -@Unstable -public abstract class TimelineAggregator extends CompositeService { - private static final Log LOG = LogFactory.getLog(TimelineAggregator.class); - - private TimelineWriter writer; - - public TimelineAggregator(String name) { - super(name); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - writer = ReflectionUtils.newInstance(conf.getClass( - YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, - FileSystemTimelineWriterImpl.class, - TimelineWriter.class), conf); - writer.init(conf); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - writer.stop(); - } - - public TimelineWriter getWriter() { - return writer; - } - - /** - * Handles entity writes. These writes are synchronous and are written to the - * backing storage without buffering/batching. If any entity already exists, - * it results in an update of the entity. - * - * This method should be reserved for selected critical entities and events. - * For normal voluminous writes one should use the async method - * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}. - * - * @param entities entities to post - * @param callerUgi the caller UGI - * @return the response that contains the result of the post. - */ - public TimelineWriteResponse postEntities(TimelineEntities entities, - UserGroupInformation callerUgi) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); - LOG.debug("postEntities(entities=" + entities + ", callerUgi=" - + callerUgi + ")"); - } - - return writer.write(entities); - } - - /** - * Handles entity writes in an asynchronous manner. The method returns as soon - * as validation is done. No promises are made on how quickly it will be - * written to the backing storage or if it will always be written to the - * backing storage. Multiple writes to the same entities may be batched and - * appropriate values updated and result in fewer writes to the backing - * storage. - * - * @param entities entities to post - * @param callerUgi the caller UGI - */ - public void postEntitiesAsync(TimelineEntities entities, - UserGroupInformation callerUgi) { - // TODO implement - if (LOG.isDebugEnabled()) { - LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" + - callerUgi + ")"); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java deleted file mode 100644 index 7d42f94..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import javax.servlet.ServletContext; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.*; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlRootElement; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.webapp.ForbiddenException; -import org.apache.hadoop.yarn.webapp.NotFoundException; - -import com.google.inject.Singleton; - -/** - * The main per-node REST end point for timeline service writes. It is - * essentially a container service that routes requests to the appropriate - * per-app services. - */ -@Private -@Unstable -@Singleton -@Path("/ws/v2/timeline") -public class TimelineAggregatorWebService { - private static final Log LOG = - LogFactory.getLog(TimelineAggregatorWebService.class); - - private @Context ServletContext context; - - @XmlRootElement(name = "about") - @XmlAccessorType(XmlAccessType.NONE) - @Public - @Unstable - public static class AboutInfo { - - private String about; - - public AboutInfo() { - - } - - public AboutInfo(String about) { - this.about = about; - } - - @XmlElement(name = "About") - public String getAbout() { - return about; - } - - public void setAbout(String about) { - this.about = about; - } - - } - - /** - * Return the description of the timeline web services. - */ - @GET - @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public AboutInfo about( - @Context HttpServletRequest req, - @Context HttpServletResponse res) { - init(res); - return new AboutInfo("Timeline API"); - } - - /** - * Accepts writes to the aggregator, and returns a response. It simply routes - * the request to the app level aggregator. It expects an application as a - * context. - */ - @PUT - @Path("/entities") - @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public Response putEntities( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - @QueryParam("async") String async, - @QueryParam("appid") String appId, - TimelineEntities entities) { - init(res); - UserGroupInformation callerUgi = getUser(req); - if (callerUgi == null) { - String msg = "The owner of the posted timeline entities is not set"; - LOG.error(msg); - throw new ForbiddenException(msg); - } - - // TODO how to express async posts and handle them - boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); - - try { - appId = parseApplicationId(appId); - if (appId == null) { - return Response.status(Response.Status.BAD_REQUEST).build(); - } - TimelineAggregator service = getAggregatorService(req, appId); - if (service == null) { - LOG.error("Application not found"); - throw new NotFoundException(); // different exception? - } - service.postEntities(entities, callerUgi); - return Response.ok().build(); - } catch (Exception e) { - LOG.error("Error putting entities", e); - throw new WebApplicationException(e, - Response.Status.INTERNAL_SERVER_ERROR); - } - } - - private String parseApplicationId(String appId) { - // Make sure the appId is not null and is valid - ApplicationId appID; - try { - if (appId != null) { - return ConverterUtils.toApplicationId(appId.trim()).toString(); - } else { - return null; - } - } catch (Exception e) { - return null; - } - } - - private TimelineAggregator - getAggregatorService(HttpServletRequest req, String appIdToParse) { - String appIdString = parseApplicationId(appIdToParse); - final TimelineAggregatorsCollection aggregatorCollection = - (TimelineAggregatorsCollection) context.getAttribute( - TimelineAggregatorsCollection.AGGREGATOR_COLLECTION_ATTR_KEY); - return aggregatorCollection.get(appIdString); - } - - private void init(HttpServletResponse response) { - response.setContentType(null); - } - - private UserGroupInformation getUser(HttpServletRequest req) { - String remoteUser = req.getRemoteUser(); - UserGroupInformation callerUgi = null; - if (remoteUser != null) { - callerUgi = UserGroupInformation.createRemoteUser(remoteUser); - } - return callerUgi; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java deleted file mode 100644 index d6e2a18..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java +++ /dev/null @@ -1,271 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import java.io.IOException; -import java.net.URI; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.http.HttpServer2; -import org.apache.hadoop.http.lib.StaticUserWebFilter; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; -import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; -import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.apache.hadoop.yarn.webapp.util.WebAppUtils; - -import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; -import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; - -/** - * Class that manages adding and removing aggregators and their lifecycle. It - * provides thread safety access to the aggregators inside. - * - * It is a singleton, and instances should be obtained via - * {@link #getInstance()}. - */ -@Private -@Unstable -public class TimelineAggregatorsCollection extends CompositeService { - private static final Log LOG = - LogFactory.getLog(TimelineAggregatorsCollection.class); - private static final TimelineAggregatorsCollection INSTANCE = - new TimelineAggregatorsCollection(); - - // access to this map is synchronized with the map itself - private final Map<String, TimelineAggregator> aggregators = - Collections.synchronizedMap( - new HashMap<String, TimelineAggregator>()); - - // REST server for this aggregator collection - private HttpServer2 timelineRestServer; - - private String timelineRestServerBindAddress; - - private AggregatorNodemanagerProtocol nmAggregatorService; - - private InetSocketAddress nmAggregatorServiceAddress; - - static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection"; - - static TimelineAggregatorsCollection getInstance() { - return INSTANCE; - } - - TimelineAggregatorsCollection() { - super(TimelineAggregatorsCollection.class.getName()); - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - this.nmAggregatorServiceAddress = conf.getSocketAddr( - YarnConfiguration.NM_BIND_HOST, - YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT); - - } - - @Override - protected void serviceStart() throws Exception { - startWebApp(); - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - if (timelineRestServer != null) { - timelineRestServer.stop(); - } - super.serviceStop(); - } - - /** - * Put the aggregator into the collection if an aggregator mapped by id does - * not exist. - * - * @throws YarnRuntimeException if there was any exception in initializing and - * starting the app level service - * @return the aggregator associated with id after the potential put. - */ - public TimelineAggregator putIfAbsent(ApplicationId appId, - TimelineAggregator aggregator) { - String id = appId.toString(); - TimelineAggregator aggregatorInTable; - boolean aggregatorIsNew = false; - synchronized (aggregators) { - aggregatorInTable = aggregators.get(id); - if (aggregatorInTable == null) { - try { - // initialize, start, and add it to the collection so it can be - // cleaned up when the parent shuts down - aggregator.init(getConfig()); - aggregator.start(); - aggregators.put(id, aggregator); - LOG.info("the aggregator for " + id + " was added"); - aggregatorInTable = aggregator; - aggregatorIsNew = true; - } catch (Exception e) { - throw new YarnRuntimeException(e); - } - } else { - String msg = "the aggregator for " + id + " already exists!"; - LOG.error(msg); - } - - } - // Report to NM if a new aggregator is added. - if (aggregatorIsNew) { - try { - reportNewAggregatorToNM(appId); - } catch (Exception e) { - // throw exception here as it cannot be used if failed report to NM - LOG.error("Failed to report a new aggregator for application: " + appId + - " to NM Aggregator Services."); - throw new YarnRuntimeException(e); - } - } - - return aggregatorInTable; - } - - /** - * Removes the aggregator for the specified id. The aggregator is also stopped - * as a result. If the aggregator does not exist, no change is made. - * - * @return whether it was removed successfully - */ - public boolean remove(String id) { - synchronized (aggregators) { - TimelineAggregator aggregator = aggregators.remove(id); - if (aggregator == null) { - String msg = "the aggregator for " + id + " does not exist!"; - LOG.error(msg); - return false; - } else { - // stop the service to do clean up - aggregator.stop(); - LOG.info("the aggregator service for " + id + " was removed"); - return true; - } - } - } - - /** - * Returns the aggregator for the specified id. - * - * @return the aggregator or null if it does not exist - */ - public TimelineAggregator get(String id) { - return aggregators.get(id); - } - - /** - * Returns whether the aggregator for the specified id exists in this - * collection. - */ - public boolean containsKey(String id) { - return aggregators.containsKey(id); - } - - /** - * Launch the REST web server for this aggregator collection - */ - private void startWebApp() { - Configuration conf = getConfig(); - // use the same ports as the old ATS for now; we could create new properties - // for the new timeline service if needed - String bindAddress = WebAppUtils.getWebAppBindURL(conf, - YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, - WebAppUtils.getAHSWebAppURLWithoutScheme(conf)); - this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress( - NetUtils.createSocketAddr(bindAddress)); - LOG.info("Instantiating the per-node aggregator webapp at " + - timelineRestServerBindAddress); - try { - Configuration confForInfoServer = new Configuration(conf); - confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); - HttpServer2.Builder builder = new HttpServer2.Builder() - .setName("timeline") - .setConf(conf) - .addEndpoint(URI.create("http://" + bindAddress)); - timelineRestServer = builder.build(); - // TODO: replace this by an authentication filter in future. - HashMap<String, String> options = new HashMap<>(); - String username = conf.get(HADOOP_HTTP_STATIC_USER, - DEFAULT_HADOOP_HTTP_STATIC_USER); - options.put(HADOOP_HTTP_STATIC_USER, username); - HttpServer2.defineFilter(timelineRestServer.getWebAppContext(), - "static_user_filter_timeline", - StaticUserWebFilter.StaticUserFilter.class.getName(), - options, new String[] {"/*"}); - - timelineRestServer.addJerseyResourcePackage( - TimelineAggregatorWebService.class.getPackage().getName() + ";" - + GenericExceptionHandler.class.getPackage().getName() + ";" - + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), - "/*"); - timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY, - TimelineAggregatorsCollection.getInstance()); - timelineRestServer.start(); - } catch (Exception e) { - String msg = "The per-node aggregator webapp failed to start."; - LOG.error(msg, e); - throw new YarnRuntimeException(msg, e); - } - } - - private void reportNewAggregatorToNM(ApplicationId appId) - throws YarnException, IOException { - this.nmAggregatorService = getNMAggregatorService(); - ReportNewAggregatorsInfoRequest request = - ReportNewAggregatorsInfoRequest.newInstance(appId, - this.timelineRestServerBindAddress); - LOG.info("Report a new aggregator for application: " + appId + - " to NM Aggregator Services."); - nmAggregatorService.reportNewAggregatorInfo(request); - } - - // protected for test - protected AggregatorNodemanagerProtocol getNMAggregatorService(){ - Configuration conf = getConfig(); - final YarnRPC rpc = YarnRPC.create(conf); - - // TODO Security settings. - return (AggregatorNodemanagerProtocol) rpc.getProxy( - AggregatorNodemanagerProtocol.class, - nmAggregatorServiceAddress, conf); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java new file mode 100644 index 0000000..7d59876 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; + +/** + * Service that handles writes to the timeline service and writes them to the + * backing storage for a given YARN application. + * + * App-related lifecycle management is handled by this service. + */ +@Private +@Unstable +public class AppLevelTimelineCollector extends TimelineCollector { + private final String applicationId; + // TODO define key metadata such as flow metadata, user, and queue + + public AppLevelTimelineCollector(String applicationId) { + super(AppLevelTimelineCollector.class.getName() + " - " + applicationId); + this.applicationId = applicationId; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + +}
