http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 53d8ae5..45ff79c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -152,6 +153,7 @@ public class RMAppImpl implements RMApp, Recoverable { private long storedFinishTime = 0; private int firstAttemptIdInStateStore = 1; private int nextAttemptId = 1; + private String collectorAddr; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; private String queue; @@ -199,6 +201,8 @@ public class RMAppImpl implements RMApp, Recoverable { // Transitions from NEW state .addTransition(RMAppState.NEW, RMAppState.NEW, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW, RMAppState.NEW, + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition()) .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, @@ -215,6 +219,8 @@ public class RMAppImpl implements RMApp, Recoverable { // Transitions from NEW_SAVING state .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, @@ -233,6 +239,8 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, new FinalSavingTransition( @@ -249,6 +257,8 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition( YarnApplicationState.RUNNING)) @@ -276,6 +286,8 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_UNREGISTERED, new FinalSavingTransition( @@ -305,6 +317,8 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, @@ -316,6 +330,8 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE, @@ -327,6 +343,8 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.KILLING, RMAppState.KILLING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.KILLING, RMAppState.KILLING, + RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( @@ -398,8 +416,19 @@ public class RMAppImpl implements RMApp, Recoverable { Configuration config, String name, String user, String queue, ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, ApplicationMasterService masterService, long submitTime, - String applicationType, Set<String> applicationTags, + String applicationType, Set<String> applicationTags, ResourceRequest amReq) { + this(applicationId, rmContext, config, name, user, queue, submissionContext, + scheduler, masterService, submitTime, applicationType, applicationTags, + amReq, -1); + } + + public RMAppImpl(ApplicationId applicationId, RMContext rmContext, + Configuration config, String name, String user, String queue, + ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, + ApplicationMasterService masterService, long submitTime, + String applicationType, Set<String> applicationTags, + ResourceRequest amReq, long startTime) { this.systemClock = SystemClock.getInstance(); @@ -415,7 +444,11 @@ public class RMAppImpl implements RMApp, Recoverable { this.scheduler = scheduler; this.masterService = masterService; this.submitTime = submitTime; - this.startTime = this.systemClock.getTime(); + if (startTime <= 0) { + this.startTime = this.systemClock.getTime(); + } else { + this.startTime = startTime; + } this.applicationType = applicationType; this.applicationTags = applicationTags; this.amReq = amReq; @@ -493,6 +526,25 @@ public class RMAppImpl implements RMApp, Recoverable { } } + /** + * Starts the application level timeline collector for this app. This should + * be used only if the timeline service v.2 is enabled. + */ + public void startTimelineCollector() { + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(applicationId); + rmContext.getRMTimelineCollectorManager().putIfAbsent( + applicationId, collector); + } + + /** + * Stops the application level timeline collector for this app. This should be + * used only if the timeline service v.2 is enabled. + */ + public void stopTimelineCollector() { + rmContext.getRMTimelineCollectorManager().remove(applicationId); + } + @Override public ApplicationId getApplicationId() { return this.applicationId; @@ -562,6 +614,21 @@ public class RMAppImpl implements RMApp, Recoverable { } @Override + public String getCollectorAddr() { + return this.collectorAddr; + } + + @Override + public void setCollectorAddr(String collectorAddress) { + this.collectorAddr = collectorAddress; + } + + @Override + public void removeCollectorAddr() { + this.collectorAddr = null; + } + + @Override public String getName() { return this.name; } @@ -829,10 +896,11 @@ public class RMAppImpl implements RMApp, Recoverable { this.firstAttemptIdInStateStore = appState.getFirstAttemptId(); this.nextAttemptId = firstAttemptIdInStateStore; } + //TODO recover collector address. + //this.collectorAddr = appState.getCollectorAddr(); // send the ATS create Event sendATSCreateEvent(this, this.startTime); - RMAppAttemptImpl preAttempt = null; for (ApplicationAttemptId attemptId : new TreeSet<>(appState.attempts.keySet())) { @@ -902,7 +970,24 @@ public class RMAppImpl implements RMApp, Recoverable { SingleArcTransition<RMAppImpl, RMAppEvent> { public void transition(RMAppImpl app, RMAppEvent event) { }; + } + + private static final class RMAppCollectorUpdateTransition + extends RMAppTransition { + + public void transition(RMAppImpl app, RMAppEvent event) { + if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) { + LOG.info("Updating collector info for app: " + app.getApplicationId()); + RMAppCollectorUpdateEvent appCollectorUpdateEvent = + (RMAppCollectorUpdateEvent) event; + // Update collector address + app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr()); + + // TODO persistent to RMStateStore for recover + // Save to RMStateStore + } + }; } private static final class RMAppNodeUpdateTransition extends RMAppTransition { @@ -1797,7 +1882,7 @@ public class RMAppImpl implements RMApp, Recoverable { } return amNodeLabelExpression; } - + @Override public CallerContext getCallerContext() { return callerContext;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java new file mode 100644 index 0000000..64c3749 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.timelineservice; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +/** + * This class extends TimelineCollectorManager to provide RM specific + * implementations. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class RMTimelineCollectorManager extends TimelineCollectorManager { + private static final Log LOG = + LogFactory.getLog(RMTimelineCollectorManager.class); + + private RMContext rmContext; + + public RMTimelineCollectorManager(RMContext rmContext) { + super(RMTimelineCollectorManager.class.getName()); + this.rmContext = rmContext; + } + + @Override + protected void doPostPut(ApplicationId appId, TimelineCollector collector) { + RMApp app = rmContext.getRMApps().get(appId); + if (app == null) { + throw new YarnRuntimeException( + "Unable to get the timeline collector context info for a " + + "non-existing app " + appId); + } + String userId = app.getUser(); + TimelineCollectorContext context = collector.getTimelineEntityContext(); + if (userId != null && !userId.isEmpty()) { + context.setUserId(userId); + } + + // initialize the flow in the environment with default values for those + // that do not specify the flow tags + // flow name: app name (or app id if app name is missing), + // flow version: "1", flow run id: start time + context.setFlowName(TimelineUtils.generateDefaultFlowName( + app.getName(), appId)); + context.setFlowVersion(TimelineUtils.DEFAULT_FLOW_VERSION); + context.setFlowRunId(app.getStartTime()); + + // the flow context is received via the application tags + for (String tag : app.getApplicationTags()) { + String[] parts = tag.split(":", 2); + if (parts.length != 2 || parts[1].isEmpty()) { + continue; + } + switch (parts[0].toUpperCase()) { + case TimelineUtils.FLOW_NAME_TAG_PREFIX: + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow name: " + parts[1]); + } + context.setFlowName(parts[1]); + break; + case TimelineUtils.FLOW_VERSION_TAG_PREFIX: + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow version: " + parts[1]); + } + context.setFlowVersion(parts[1]); + break; + case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX: + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow run id: " + parts[1]); + } + context.setFlowRunId(Long.parseLong(parts[1])); + break; + default: + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/package-info.java new file mode 100644 index 0000000..c470011 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.resourcemanager.timelineservice + * contains classes related to handling of app level collectors. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 075df47..c3d04de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -190,9 +191,11 @@ public class TestAppManager{ public int getCompletedAppsListSize() { return super.getCompletedAppsListSize(); } + public int getCompletedAppsInStateStore() { return this.completedAppsInStateStore; } + public void submitApplication( ApplicationSubmissionContext submissionContext, String user) throws YarnException, IOException { @@ -223,6 +226,8 @@ public class TestAppManager{ long now = System.currentTimeMillis(); rmContext = mockRMContext(1, now - 10); + rmContext + .setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class)); ResourceScheduler scheduler = mockResourceScheduler(); ((RMContextImpl)rmContext).setScheduler(scheduler); Configuration conf = new Configuration(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 88a4637..ef89509 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -24,6 +24,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -140,6 +141,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; @@ -637,6 +639,9 @@ public class TestClientRMService { new EventHandler<Event>() { public void handle(Event event) {} }); + doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) + .getRMTimelineCollectorManager(); + ApplicationId appId1 = getApplicationId(100); ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); @@ -652,6 +657,7 @@ public class TestClientRMService { ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, appManager, mockAclsManager, mockQueueACLsManager, null); + rmService.init(new Configuration()); // without name and queue @@ -726,6 +732,9 @@ public class TestClientRMService { mockRMContext(yarnScheduler, rmContext); RMStateStore stateStore = mock(RMStateStore.class); when(rmContext.getStateStore()).thenReturn(stateStore); + doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) + .getRMTimelineCollectorManager(); + RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, null, mock(ApplicationACLsManager.class), new Configuration()); when(rmContext.getDispatcher().getEventHandler()).thenReturn( @@ -742,6 +751,7 @@ public class TestClientRMService { ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, appManager, mockAclsManager, mockQueueACLsManager, null); + rmService.init(new Configuration()); // Initialize appnames and queues String[] queues = {QUEUE_1, QUEUE_2}; @@ -899,10 +909,13 @@ public class TestClientRMService { }; when(rmContext.getDispatcher().getEventHandler()).thenReturn(eventHandler); - + doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) + .getRMTimelineCollectorManager(); + final ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, appManager, null, null, null); + rmService.init(new Configuration()); // submit an app and wait for it to block while in app submission Thread t = new Thread() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 9162258..b271b37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -109,6 +109,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; @@ -1126,6 +1127,68 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { } @Test (timeout = 60000) + public void testRMRestartTimelineCollectorContext() throws Exception { + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map<ApplicationId, ApplicationStateData> rmAppState = + rmState.getApplicationState(); + MockRM rm1 = null; + MockRM rm2 = null; + try { + rm1 = createMockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submit an app. + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap<ApplicationAccessType, String>(), false, "default", -1, + null); + // Check if app info has been saved. + ApplicationStateData appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app.getApplicationSubmissionContext() + .getApplicationId()); + + // Allocate the AM + nm1.nodeHeartbeat(true); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + + ApplicationId appId = app.getApplicationId(); + TimelineCollectorContext contextBeforeRestart = + rm1.getRMContext().getRMTimelineCollectorManager().get(appId). + getTimelineEntityContext(); + + // Restart RM. + rm2 = createMockRM(conf, memStore); + rm2.start(); + Assert.assertEquals(1, rm2.getRMContext().getRMApps().size()); + rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + TimelineCollectorContext contextAfterRestart = + rm2.getRMContext().getRMTimelineCollectorManager().get(appId). + getTimelineEntityContext(); + Assert.assertEquals("Collector contexts for an app should be same " + + "across restarts", contextBeforeRestart, contextAfterRestart); + } finally { + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false); + if (rm1 != null) { + rm1.close(); + } + if (rm2 != null) { + rm2.close(); + } + } + } + + @Test (timeout = 60000) public void testDelegationTokenRestoredInDelegationTokenRenewer() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index eb0d5f1..098ba54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; @@ -68,8 +70,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -864,6 +869,87 @@ public class TestResourceTrackerService extends NodeLabelTestBase { checkRebootedNMCount(rm, ++initialMetricCount); } + @Test + public void testNodeHeartbeatForAppCollectorsMap() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + // set version to 2 + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + // enable aux-service based timeline collectors + conf.set(YarnConfiguration.NM_AUX_SERVICES, "timeline_collector"); + conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + + "timeline_collector" + ".class", + PerNodeTimelineCollectorsAuxService.class.getName()); + + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:1234", 2048); + + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); + + RMNodeImpl node1 = + (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + + RMNodeImpl node2 = + (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + + RMApp app1 = rm.submitApp(1024); + String collectorAddr1 = "1.2.3.4:5"; + app1.setCollectorAddr(collectorAddr1); + + String collectorAddr2 = "5.4.3.2:1"; + RMApp app2 = rm.submitApp(1024); + app2.setCollectorAddr(collectorAddr2); + + // Create a running container for app1 running on nm1 + ContainerId runningContainerId1 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + app1.getApplicationId(), 0), 0); + + ContainerStatus status1 = ContainerStatus.newInstance(runningContainerId1, + ContainerState.RUNNING, "", 0); + List<ContainerStatus> statusList = new ArrayList<ContainerStatus>(); + statusList.add(status1); + NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true, + "", System.currentTimeMillis()); + NodeStatus nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0, + statusList, null, nodeHealth, null, null, null); + node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus, + nodeHeartbeat1)); + + Assert.assertEquals(1, node1.getRunningApps().size()); + Assert.assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0)); + + // Create a running container for app2 running on nm2 + ContainerId runningContainerId2 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + app2.getApplicationId(), 0), 0); + + ContainerStatus status2 = ContainerStatus.newInstance(runningContainerId2, + ContainerState.RUNNING, "", 0); + statusList = new ArrayList<ContainerStatus>(); + statusList.add(status2); + nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0, + statusList, null, nodeHealth, null, null, null); + node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus, + nodeHeartbeat2)); + Assert.assertEquals(1, node2.getRunningApps().size()); + Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0)); + + nodeHeartbeat1 = nm1.nodeHeartbeat(true); + Map<ApplicationId, String> map1 = nodeHeartbeat1.getAppCollectorsMap(); + Assert.assertEquals(1, map1.size()); + Assert.assertEquals(collectorAddr1, map1.get(app1.getApplicationId())); + + nodeHeartbeat2 = nm2.nodeHeartbeat(true); + Map<ApplicationId, String> map2 = nodeHeartbeat2.getAppCollectorsMap(); + Assert.assertEquals(1, map2.size()); + Assert.assertEquals(collectorAddr2, map2.get(app2.getApplicationId())); + } + private void checkRebootedNMCount(MockRM rm2, int count) throws InterruptedException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 560a305..19ee0b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -95,6 +95,18 @@ public abstract class MockAsm extends MockApps { throw new UnsupportedOperationException("Not supported yet."); } @Override + public String getCollectorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override + public void setCollectorAddr(String collectorAddr) { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override + public void removeCollectorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public ApplicationId getApplicationId() { throw new UnsupportedOperationException("Not supported yet."); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 087199d..7dafc22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -89,9 +90,12 @@ public class TestRMAppLogAggregationStatus { rmContext = new RMContextImpl(rmDispatcher, null, null, null, null, null, null, null, null); - rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher()); + rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class)); + rmContext + .setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class)); + scheduler = mock(YarnScheduler.class); doAnswer( new Answer<Void>() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java index 9ea2baa..1e279d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java @@ -68,14 +68,14 @@ import org.junit.Test; public class TestSystemMetricsPublisher { private static ApplicationHistoryServer timelineServer; - private static SystemMetricsPublisher metricsPublisher; + private static TimelineServiceV1Publisher metricsPublisher; private static TimelineStore store; @BeforeClass public static void setup() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true); + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, MemoryTimelineStore.class, TimelineStore.class); conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, @@ -89,7 +89,7 @@ public class TestSystemMetricsPublisher { timelineServer.start(); store = timelineServer.getTimelineStore(); - metricsPublisher = new SystemMetricsPublisher(); + metricsPublisher = new TimelineServiceV1Publisher(); metricsPublisher.init(conf); metricsPublisher.start(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java new file mode 100644 index 0000000..3ea4714 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -0,0 +1,398 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSystemMetricsPublisherForV2 { + + /** + * The folder where the FileSystemTimelineWriterImpl writes the entities. + */ + private static File testRootDir = new File("target", + TestSystemMetricsPublisherForV2.class.getName() + "-localDir") + .getAbsoluteFile(); + + private static TimelineServiceV2Publisher metricsPublisher; + private static DrainDispatcher dispatcher = new DrainDispatcher(); + + private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext; + + private static RMTimelineCollectorManager rmTimelineCollectorManager; + + @BeforeClass + public static void setup() throws Exception { + if (testRootDir.exists()) { + //cleanup before hand + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + + RMContext rmContext = mock(RMContext.class); + rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>(); + when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext); + rmTimelineCollectorManager = new RMTimelineCollectorManager(rmContext); + when(rmContext.getRMTimelineCollectorManager()).thenReturn( + rmTimelineCollectorManager); + + Configuration conf = getTimelineV2Conf(); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + rmTimelineCollectorManager.init(conf); + rmTimelineCollectorManager.start(); + + dispatcher.init(conf); + dispatcher.start(); + metricsPublisher = new TimelineServiceV2Publisher(rmContext) { + @Override + protected Dispatcher getDispatcher() { + return dispatcher; + } + }; + metricsPublisher.init(conf); + metricsPublisher.start(); + } + + @AfterClass + public static void tearDown() throws Exception { + if (testRootDir.exists()) { + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + if (rmTimelineCollectorManager != null) { + rmTimelineCollectorManager.stop(); + } + if (metricsPublisher != null) { + metricsPublisher.stop(); + } + } + + private static Configuration getTimelineV2Conf() { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + conf.setInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2); + conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED, + true); + try { + conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + testRootDir.getCanonicalPath()); + } catch (IOException e) { + e.printStackTrace(); + Assert + .fail("Exception while setting the " + + "TIMELINE_SERVICE_STORAGE_DIR_ROOT "); + } + return conf; + } + + @Test + public void testSystemMetricPublisherInitialization() { + @SuppressWarnings("resource") + TimelineServiceV2Publisher publisher = + new TimelineServiceV2Publisher(mock(RMContext.class)); + try { + Configuration conf = getTimelineV2Conf(); + conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED, + YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_EVENTS_ENABLED); + publisher.init(conf); + assertFalse( + "Default configuration should not publish container events from RM", + publisher.isPublishContainerEvents()); + + publisher.stop(); + + publisher = new TimelineServiceV2Publisher(mock(RMContext.class)); + conf = getTimelineV2Conf(); + publisher.init(conf); + assertTrue("Expected to have registered event handlers and set ready to " + + "publish events after init", + publisher.isPublishContainerEvents()); + publisher.start(); + assertTrue("Expected to publish container events from RM", + publisher.isPublishContainerEvents()); + } finally { + publisher.stop(); + } + } + + @Test(timeout = 10000) + public void testPublishApplicationMetrics() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + RMApp app = createAppAndRegister(appId); + + metricsPublisher.appCreated(app, app.getStartTime()); + metricsPublisher.appACLsUpdated(app, "user1,user2", 4L); + metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime()); + dispatcher.await(); + + String outputDirApp = + getTimelineEntityDir(app) + "/" + TimelineEntityType.YARN_APPLICATION + + "/"; + + File entityFolder = new File(outputDirApp); + Assert.assertTrue(entityFolder.isDirectory()); + + // file name is <entityId>.thist + String timelineServiceFileName = + appId.toString() + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File appFile = new File(outputDirApp, timelineServiceFileName); + Assert.assertTrue(appFile.exists()); + verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE); + } + + @Test(timeout = 10000) + public void testPublishAppAttemptMetrics() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + RMApp app = rmAppsMapInContext.get(appId); + if (app == null) { + app = createAppAndRegister(appId); + } + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId); + metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L); + when(app.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, + app, Integer.MAX_VALUE + 2L); + + dispatcher.await(); + + String outputDirApp = + getTimelineEntityDir(app) + "/" + + TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/"; + + File entityFolder = new File(outputDirApp); + Assert.assertTrue(entityFolder.isDirectory()); + + // file name is <entityId>.thist + String timelineServiceFileName = + appAttemptId.toString() + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File appFile = new File(outputDirApp, timelineServiceFileName); + Assert.assertTrue(appFile.exists()); + verifyEntity(appFile, 2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + } + + @Test(timeout = 10000) + public void testPublishContainerMetrics() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + RMApp app = rmAppsMapInContext.get(appId); + if (app == null) { + app = createAppAndRegister(appId); + } + ContainerId containerId = + ContainerId.newContainerId(ApplicationAttemptId.newInstance( + appId, 1), 1); + RMContainer container = createRMContainer(containerId); + metricsPublisher.containerCreated(container, container.getCreationTime()); + metricsPublisher.containerFinished(container, container.getFinishTime()); + dispatcher.await(); + + String outputDirApp = + getTimelineEntityDir(app) + "/" + + TimelineEntityType.YARN_CONTAINER + "/"; + + File entityFolder = new File(outputDirApp); + Assert.assertTrue(entityFolder.isDirectory()); + + // file name is <entityId>.thist + String timelineServiceFileName = + containerId.toString() + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File appFile = new File(outputDirApp, timelineServiceFileName); + Assert.assertTrue(appFile.exists()); + verifyEntity(appFile, 2, + ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE); + } + + private RMApp createAppAndRegister(ApplicationId appId) { + RMApp app = createRMApp(appId); + + // some stuff which are currently taken care in RMAppImpl + rmAppsMapInContext.putIfAbsent(appId, app); + AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId); + rmTimelineCollectorManager.putIfAbsent(appId, collector); + return app; + } + + private static void verifyEntity(File entityFile, long expectedEvents, + String eventForCreatedTime) throws IOException { + BufferedReader reader = null; + String strLine; + long count = 0; + try { + reader = new BufferedReader(new FileReader(entityFile)); + while ((strLine = reader.readLine()) != null) { + if (strLine.trim().length() > 0) { + TimelineEntity entity = FileSystemTimelineReaderImpl. + getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class); + for (TimelineEvent event : entity.getEvents()) { + if (event.getId().equals(eventForCreatedTime)) { + assertTrue(entity.getCreatedTime() > 0); + break; + } + } + count++; + } + } + } finally { + reader.close(); + } + assertEquals("Expected " + expectedEvents + " events to be published", + count, expectedEvents); + } + + private String getTimelineEntityDir(RMApp app) { + String outputDirApp = + testRootDir.getAbsolutePath() + "/" + + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/" + + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + + app.getUser() + "/" + + app.getName() + "/" + + TimelineUtils.DEFAULT_FLOW_VERSION + "/" + + app.getStartTime() + "/" + + app.getApplicationId(); + return outputDirApp; + } + + private static RMApp createRMApp(ApplicationId appId) { + RMApp app = mock(RMAppImpl.class); + when(app.getApplicationId()).thenReturn(appId); + when(app.getName()).thenReturn("test app"); + when(app.getApplicationType()).thenReturn("test app type"); + when(app.getUser()).thenReturn("testUser"); + when(app.getQueue()).thenReturn("test queue"); + when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L); + when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L); + when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L); + when(app.getDiagnostics()).thenReturn( + new StringBuilder("test diagnostics info")); + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn( + ApplicationAttemptId.newInstance(appId, 1)); + when(app.getCurrentAppAttempt()).thenReturn(appAttempt); + when(app.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + when(app.getRMAppMetrics()).thenReturn( + new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE)); + when(app.getApplicationTags()).thenReturn(Collections.<String> emptySet()); + ApplicationSubmissionContext appSubmissionContext = + mock(ApplicationSubmissionContext.class); + when(appSubmissionContext.getPriority()) + .thenReturn(Priority.newInstance(0)); + when(app.getApplicationSubmissionContext()) + .thenReturn(appSubmissionContext); + return app; + } + + private static RMAppAttempt createRMAppAttempt( + ApplicationAttemptId appAttemptId) { + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId); + when(appAttempt.getHost()).thenReturn("test host"); + when(appAttempt.getRpcPort()).thenReturn(-100); + Container container = mock(Container.class); + when(container.getId()).thenReturn( + ContainerId.newContainerId(appAttemptId, 1)); + when(appAttempt.getMasterContainer()).thenReturn(container); + when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info"); + when(appAttempt.getTrackingUrl()).thenReturn("test tracking url"); + when(appAttempt.getOriginalTrackingUrl()).thenReturn( + "test original tracking url"); + return appAttempt; + } + + private static RMContainer createRMContainer(ContainerId containerId) { + RMContainer container = mock(RMContainer.class); + when(container.getContainerId()).thenReturn(containerId); + when(container.getAllocatedNode()).thenReturn( + NodeId.newInstance("test host", -100)); + when(container.getAllocatedResource()).thenReturn( + Resource.newInstance(-1, -1)); + when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED); + when(container.getCreationTime()).thenReturn(Integer.MAX_VALUE + 1L); + when(container.getFinishTime()).thenReturn(Integer.MAX_VALUE + 2L); + when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info"); + when(container.getContainerExitStatus()).thenReturn(-1); + when(container.getContainerState()).thenReturn(ContainerState.COMPLETE); + Container mockContainer = mock(Container.class); + when(container.getContainer()).thenReturn(mockContainer); + when(mockContainer.getNodeHttpAddress()) + .thenReturn("http://localhost:1234"); + return container; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 35fd0c3..62a5c52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -301,4 +301,18 @@ public class MockRMApp implements RMApp { public CallerContext getCallerContext() { throw new UnsupportedOperationException("Not supported yet."); } + + public String getCollectorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void removeCollectorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void setCollectorAddr(String collectorAddr) { + throw new UnsupportedOperationException("Not supported yet."); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index bb5c25c..9131643 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -225,6 +226,9 @@ public class TestRMAppTransitions { .getAppResourceUsageReport((ApplicationAttemptId)Matchers.any()); doReturn(resourceScheduler).when(rmContext).getScheduler(); + doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) + .getRMTimelineCollectorManager(); + rmDispatcher.register(RMAppAttemptEventType.class, new TestApplicationAttemptEventDispatcher(this.rmContext)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml index eb752d1..e3e58d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml @@ -56,6 +56,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> <groupId>org.apache.hadoop</groupId> @@ -88,6 +93,18 @@ <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + </dependency> + <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minikdc</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 0f239c0..67b652b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.Shell; @@ -544,6 +545,8 @@ public class MiniYARNCluster extends CompositeService { MiniYARNCluster.getHostname() + ":0"); config.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, MiniYARNCluster.getHostname() + ":0"); + config.set(YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, + MiniYARNCluster.getHostname() + ":0"); WebAppUtils .setNMWebAppHostNameAndPort(config, MiniYARNCluster.getHostname(), 0); @@ -782,8 +785,8 @@ public class MiniYARNCluster extends CompositeService { if (!useFixedPorts) { String hostname = MiniYARNCluster.getHostname(); conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, hostname - + ":0"); + conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + hostname + ":" + ServerSocketUtil.getPort(9188, 10)); } appHistoryServer.init(conf); super.serviceInit(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java index e67a236..9226ead 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java @@ -74,8 +74,6 @@ public class TestMiniYarnCluster { String hostname = MiniYARNCluster.getHostname(); Assert.assertEquals(hostname + ":0", conf.get(YarnConfiguration.TIMELINE_SERVICE_ADDRESS)); - Assert.assertEquals(hostname + ":0", - conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS)); cluster.start(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java new file mode 100644 index 0000000..5a63547 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice; + + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager; +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestTimelineServiceClientIntegration { + private static NodeTimelineCollectorManager collectorManager; + private static PerNodeTimelineCollectorsAuxService auxService; + private static Configuration conf; + + @BeforeClass + public static void setupClass() throws Exception { + try { + collectorManager = new MockNodeTimelineCollectorManager(); + conf = new YarnConfiguration(); + // enable timeline service v.2 + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + auxService = + PerNodeTimelineCollectorsAuxService.launchServer(new String[0], + collectorManager, conf); + auxService.addApplication(ApplicationId.newInstance(0, 1)); + } catch (ExitUtil.ExitException e) { + fail(); + } + } + + @AfterClass + public static void tearDownClass() throws Exception { + if (auxService != null) { + auxService.stop(); + } + } + + @Test + public void testPutEntities() throws Exception { + TimelineClient client = + TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1)); + try { + // set the timeline service address manually + client.setTimelineServiceAddress( + collectorManager.getRestServerBindAddress()); + client.init(conf); + client.start(); + TimelineEntity entity = new TimelineEntity(); + entity.setType("test entity type"); + entity.setId("test entity id"); + TimelineMetric metric = + new TimelineMetric(TimelineMetric.Type.TIME_SERIES); + metric.setId("test metric id"); + metric.addValue(1L, 1.0D); + metric.addValue(2L, 2.0D); + entity.addMetric(metric); + client.putEntities(entity); + client.putEntitiesAsync(entity); + } finally { + client.stop(); + } + } + + @Test + public void testPutExtendedEntities() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + TimelineClient client = + TimelineClient.createTimelineClient(appId); + try { + // set the timeline service address manually + client.setTimelineServiceAddress( + collectorManager.getRestServerBindAddress()); + client.init(conf); + client.start(); + ClusterEntity cluster = new ClusterEntity(); + cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID); + FlowRunEntity flow = new FlowRunEntity(); + flow.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); + flow.setName("test_flow_name"); + flow.setVersion("test_flow_version"); + flow.setRunId(1L); + flow.setParent(cluster.getType(), cluster.getId()); + ApplicationEntity app = new ApplicationEntity(); + app.setId(appId.toString()); + flow.addChild(app.getType(), app.getId()); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + ApplicationAttemptEntity appAttempt = new ApplicationAttemptEntity(); + appAttempt.setId(attemptId.toString()); + ContainerId containerId = ContainerId.newContainerId(attemptId, 1); + ContainerEntity container = new ContainerEntity(); + container.setId(containerId.toString()); + UserEntity user = new UserEntity(); + user.setId(UserGroupInformation.getCurrentUser().getShortUserName()); + QueueEntity queue = new QueueEntity(); + queue.setId("default_queue"); + client.putEntities(cluster, flow, app, appAttempt, container, user, + queue); + client.putEntitiesAsync(cluster, flow, app, appAttempt, container, user, + queue); + } finally { + client.stop(); + } + } + + private static class MockNodeTimelineCollectorManager extends + NodeTimelineCollectorManager { + public MockNodeTimelineCollectorManager() { + super(); + } + + @Override + protected CollectorNodemanagerProtocol getNMCollectorService() { + CollectorNodemanagerProtocol protocol = + mock(CollectorNodemanagerProtocol.class); + try { + GetTimelineCollectorContextResponse response = + GetTimelineCollectorContextResponse.newInstance( + UserGroupInformation.getCurrentUser().getShortUserName(), + "test_flow_name", "test_flow_version", 1L); + when(protocol.getTimelineCollectorContext(any( + GetTimelineCollectorContextRequest.class))).thenReturn(response); + } catch (YarnException | IOException e) { + fail(); + } + return protocol; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org