http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index af03907..aaa15bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -108,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl.FlowContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; @@ -140,8 +141,11 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import java.io.DataInputStream; import java.io.IOException; @@ -163,8 +167,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import static org.apache.hadoop.service.Service.STATE.STARTED; -import org.apache.hadoop.yarn.util.resource.Resources; - public class ContainerManagerImpl extends CompositeService implements ContainerManager { @@ -205,6 +207,9 @@ public class ContainerManagerImpl extends CompositeService implements private long waitForContainersOnShutdownMillis; + // NM metrics publisher is set only if the timeline service v.2 is enabled + private NMTimelinePublisher nmMetricsPublisher; + public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) { @@ -231,6 +236,15 @@ public class ContainerManagerImpl extends CompositeService implements auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); + // initialize the metrics publisher if the timeline service v.2 is enabled + // and the system publisher is enabled + Configuration conf = context.getConf(); + if (YarnConfiguration.timelineServiceV2Enabled(conf) && + YarnConfiguration.systemMetricsPublisherEnabled(conf)) { + LOG.info("YARN system metrics publishing service is enabled"); + nmMetricsPublisher = createNMTimelinePublisher(context); + context.setNMTimelinePublisher(nmMetricsPublisher); + } this.containersMonitor = createContainersMonitor(exec); addService(this.containersMonitor); @@ -238,7 +252,9 @@ public class ContainerManagerImpl extends CompositeService implements new ContainerEventDispatcher()); dispatcher.register(ApplicationEventType.class, createApplicationEventDispatcher()); - dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc); + dispatcher.register(LocalizationEventType.class, + new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc, + nmMetricsPublisher)); dispatcher.register(AuxServicesEventType.class, auxiliaryServices); dispatcher.register(ContainersMonitorEventType.class, containersMonitor); dispatcher.register(ContainersLauncherEventType.class, containersLauncher); @@ -347,6 +363,7 @@ public class ContainerManagerImpl extends CompositeService implements } LOG.info("Recovering application " + appId); + //TODO: Recover flow and flow run ID ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, creds, context, p.getAppLogAggregationInitedTime()); context.getApplications().put(appId, app); @@ -435,6 +452,14 @@ public class ContainerManagerImpl extends CompositeService implements return new SharedCacheUploadService(); } + @VisibleForTesting + protected NMTimelinePublisher createNMTimelinePublisher(Context ctxt) { + NMTimelinePublisher nmTimelinePublisherLocal = + new NMTimelinePublisher(ctxt); + addIfService(nmTimelinePublisherLocal); + return nmTimelinePublisherLocal; + } + protected ContainersLauncher createContainersLauncher(Context context, ContainerExecutor exec) { return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); @@ -959,22 +984,41 @@ public class ContainerManagerImpl extends CompositeService implements try { if (!isServiceStopped()) { // Create the application - Application application = new ApplicationImpl(dispatcher, user, - applicationID, credentials, context); - if (null == context.getApplications().putIfAbsent(applicationID, - application)) { - LOG.info("Creating a new application reference for app " - + applicationID); - LogAggregationContext logAggregationContext = - containerTokenIdentifier.getLogAggregationContext(); - Map<ApplicationAccessType, String> appAcls = - container.getLaunchContext().getApplicationACLs(); - context.getNMStateStore().storeApplication(applicationID, - buildAppProto(applicationID, user, credentials, appAcls, - logAggregationContext)); - dispatcher.getEventHandler().handle( - new ApplicationInitEvent(applicationID, appAcls, - logAggregationContext)); + // populate the flow context from the launch context if the timeline + // service v.2 is enabled + FlowContext flowContext = null; + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + String flowName = launchContext.getEnvironment().get( + TimelineUtils.FLOW_NAME_TAG_PREFIX); + String flowVersion = launchContext.getEnvironment().get( + TimelineUtils.FLOW_VERSION_TAG_PREFIX); + String flowRunIdStr = launchContext.getEnvironment().get( + TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); + long flowRunId = 0L; + if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { + flowRunId = Long.parseLong(flowRunIdStr); + } + flowContext = + new FlowContext(flowName, flowVersion, flowRunId); + } + if (!context.getApplications().containsKey(applicationID)) { + Application application = + new ApplicationImpl(dispatcher, user, flowContext, + applicationID, credentials, context); + if (context.getApplications().putIfAbsent(applicationID, + application) == null) { + LOG.info("Creating a new application reference for app " + + applicationID); + LogAggregationContext logAggregationContext = + containerTokenIdentifier.getLogAggregationContext(); + Map<ApplicationAccessType, String> appAcls = + container.getLaunchContext().getApplicationACLs(); + context.getNMStateStore().storeApplication(applicationID, + buildAppProto(applicationID, user, credentials, appAcls, + logAggregationContext)); + dispatcher.getEventHandler().handle(new ApplicationInitEvent( + applicationID, appAcls, logAggregationContext)); + } } this.context.getNMStateStore().storeContainer(containerId, @@ -1314,6 +1358,9 @@ public class ContainerManagerImpl extends CompositeService implements Container c = containers.get(event.getContainerID()); if (c != null) { c.handle(event); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.publishContainerEvent(event); + } } else { LOG.warn("Event " + event + " sent to absent container " + event.getContainerID()); @@ -1322,7 +1369,6 @@ public class ContainerManagerImpl extends CompositeService implements } class ApplicationEventDispatcher implements EventHandler<ApplicationEvent> { - @Override public void handle(ApplicationEvent event) { Application app = @@ -1330,6 +1376,9 @@ public class ContainerManagerImpl extends CompositeService implements event.getApplicationID()); if (app != null) { app.handle(event); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.publishApplicationEvent(event); + } } else { LOG.warn("Event " + event + " sent to absent application " + event.getApplicationID()); @@ -1337,6 +1386,27 @@ public class ContainerManagerImpl extends CompositeService implements } } + private static final class LocalizationEventHandlerWrapper implements + EventHandler<LocalizationEvent> { + + private EventHandler<LocalizationEvent> origLocalizationEventHandler; + private NMTimelinePublisher timelinePublisher; + + LocalizationEventHandlerWrapper(EventHandler<LocalizationEvent> handler, + NMTimelinePublisher publisher) { + this.origLocalizationEventHandler = handler; + this.timelinePublisher = publisher; + } + + @Override + public void handle(LocalizationEvent event) { + origLocalizationEventHandler.handle(event); + if (timelinePublisher != null) { + timelinePublisher.publishLocalizationEvent(event); + } + } + } + @SuppressWarnings("unchecked") @Override public void handle(ContainerManagerEvent event) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java index b1571e9..aee0862 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java @@ -35,4 +35,9 @@ public interface Application extends EventHandler<ApplicationEvent> { ApplicationState getApplicationState(); + String getFlowName(); + + String getFlowVersion(); + + long getFlowRunId(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java index 6b8007f..0a8ffdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java @@ -19,18 +19,24 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; public class ApplicationContainerFinishedEvent extends ApplicationEvent { - private ContainerId containerID; + private ContainerStatus containerStatus; - public ApplicationContainerFinishedEvent( - ContainerId containerID) { - super(containerID.getApplicationAttemptId().getApplicationId(), + public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) { + super(containerStatus.getContainerId().getApplicationAttemptId(). + getApplicationId(), ApplicationEventType.APPLICATION_CONTAINER_FINISHED); - this.containerID = containerID; + this.containerStatus = containerStatus; } public ContainerId getContainerID() { - return this.containerID; + return containerStatus.getContainerId(); } + + public ContainerStatus getContainerStatus() { + return containerStatus; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 5dd22b4..b9197c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -30,6 +30,7 @@ import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -55,6 +57,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -71,6 +74,8 @@ public class ApplicationImpl implements Application { final Dispatcher dispatcher; final String user; + // flow context is set only if the timeline service v.2 is enabled + private FlowContext flowContext; final ApplicationId appId; final Credentials credentials; Map<ApplicationAccessType, String> applicationACLs; @@ -96,13 +101,35 @@ public class ApplicationImpl implements Application { private final NMStateStoreService appStateStore; public ApplicationImpl(Dispatcher dispatcher, String user, - ApplicationId appId, Credentials credentials, + ApplicationId appId, Credentials credentials, Context context) { + this(dispatcher, user, null, appId, credentials, context, -1L); + } + + public ApplicationImpl(Dispatcher dispatcher, String user, + ApplicationId appId, Credentials credentials, Context context, + long recoveredLogInitedTime) { + this(dispatcher, user, null, appId, credentials, context, + recoveredLogInitedTime); + } + + public ApplicationImpl(Dispatcher dispatcher, String user, + FlowContext flowContext, ApplicationId appId, Credentials credentials, Context context, long recoveredLogInitedTime) { this.dispatcher = dispatcher; this.user = user; this.appId = appId; this.credentials = credentials; this.aclsManager = context.getApplicationACLsManager(); + Configuration conf = context.getConf(); + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + if (flowContext == null) { + throw new IllegalArgumentException("flow context cannot be null"); + } + this.flowContext = flowContext; + if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) { + context.getNMTimelinePublisher().createTimelineClient(appId); + } + } this.context = context; this.appStateStore = context.getNMStateStore(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -113,8 +140,37 @@ public class ApplicationImpl implements Application { } public ApplicationImpl(Dispatcher dispatcher, String user, - ApplicationId appId, Credentials credentials, Context context) { - this(dispatcher, user, appId, credentials, context, -1); + FlowContext flowContext, ApplicationId appId, + Credentials credentials, Context context) { + this(dispatcher, user, flowContext, appId, credentials, + context, -1); + } + + /** + * Data object that encapsulates the flow context for the application purpose. + */ + public static class FlowContext { + private final String flowName; + private final String flowVersion; + private final long flowRunId; + + public FlowContext(String flowName, String flowVersion, long flowRunId) { + this.flowName = flowName; + this.flowVersion = flowVersion; + this.flowRunId = flowRunId; + } + + public String getFlowName() { + return flowName; + } + + public String getFlowVersion() { + return flowVersion; + } + + public long getFlowRunId() { + return flowRunId; + } } @Override @@ -496,6 +552,20 @@ public class ApplicationImpl implements Application { new LogHandlerAppFinishedEvent(app.appId)); app.context.getNMTokenSecretManager().appFinished(app.getAppId()); + // Remove collectors info for finished apps. + // TODO check we remove related collectors info in failure cases + // (YARN-3038) + Map<ApplicationId, String> registeredCollectors = + app.context.getRegisteredCollectors(); + if (registeredCollectors != null) { + registeredCollectors.remove(app.getAppId()); + } + // stop timelineClient when application get finished. + NMTimelinePublisher nmTimelinePublisher = + app.context.getNMTimelinePublisher(); + if (nmTimelinePublisher != null) { + nmTimelinePublisher.stopTimelineClient(app.getAppId()); + } } } @@ -556,4 +626,19 @@ public class ApplicationImpl implements Application { this.readLock.unlock(); } } + + @Override + public String getFlowName() { + return flowContext == null ? null : flowContext.getFlowName(); + } + + @Override + public String getFlowVersion() { + return flowContext == null ? null : flowContext.getFlowVersion(); + } + + @Override + public long getFlowRunId() { + return flowContext == null ? 0L : flowContext.getFlowRunId(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index f8a7e35..448a14c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -23,6 +23,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; @@ -83,4 +84,7 @@ public interface Container extends EventHandler<ContainerEvent> { boolean canRollback(); void commitUpgrade(); + + Priority getPriority(); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 8588dc9..cf5a304 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; @@ -72,11 +74,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; -import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -84,7 +86,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; @@ -533,6 +534,10 @@ public class ContainerImpl implements Container { } } + public NMTimelinePublisher getNMTimelinePublisher() { + return context.getNMTimelinePublisher(); + } + @Override public String getUser() { this.readLock.lock(); @@ -680,7 +685,10 @@ public class ContainerImpl implements Container { // Inform the application @SuppressWarnings("rawtypes") EventHandler eventHandler = dispatcher.getEventHandler(); - eventHandler.handle(new ApplicationContainerFinishedEvent(containerId)); + + ContainerStatus containerStatus = cloneAndGetContainerStatus(); + eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus)); + // Remove the container from the resource-monitor eventHandler.handle(new ContainerStopMonitoringEvent(containerId)); // Tell the logService too @@ -1478,7 +1486,8 @@ public class ContainerImpl implements Container { container.containerMetrics.finished(); } container.sendFinishedEvents(); - //if the current state is NEW it means the CONTAINER_INIT was never + + // if the current state is NEW it means the CONTAINER_INIT was never // sent for the event, thus no need to send the CONTAINER_STOP if (container.getCurrentState() != org.apache.hadoop.yarn.api.records.ContainerState.NEW) { @@ -1709,4 +1718,8 @@ public class ContainerImpl implements Container { public void commitUpgrade() { this.reInitContext = null; } + + public Priority getPriority() { + return containerTokenIdentifier.getPriority(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index aa276c3..7db1150 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -20,8 +20,13 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @@ -35,15 +40,14 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import java.util.Arrays; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; public class ContainersMonitorImpl extends AbstractService implements ContainersMonitor { @@ -81,6 +85,11 @@ public class ContainersMonitorImpl extends AbstractService implements private static final long UNKNOWN_MEMORY_LIMIT = -1L; private int nodeCpuPercentageForYARN; + @Private + public static enum ContainerMetric { + CPU, MEMORY + } + private ResourceUtilization containersUtilization; // Tracks the aggregated allocation of the currently allocated containers // when queuing of containers at the NMs is enabled. @@ -430,7 +439,9 @@ public class ContainersMonitorImpl extends AbstractService implements } ResourceCalculatorProcessTree pt = - ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf); + ResourceCalculatorProcessTree. + getResourceCalculatorProcessTree( + pId, processTreeClass, conf); ptInfo.setPid(pId); ptInfo.setProcessTree(pt); @@ -467,6 +478,7 @@ public class ContainersMonitorImpl extends AbstractService implements pTree.updateProcessTree(); // update process-tree long currentVmemUsage = pTree.getVirtualMemorySize(); long currentPmemUsage = pTree.getRssMemorySize(); + // if machine has 6 cores and 3 are used, // cpuUsagePercentPerCore should be 300% and // cpuUsageTotalCoresPercentage should be 50% @@ -573,10 +585,19 @@ public class ContainersMonitorImpl extends AbstractService implements trackingContainers.remove(containerId); LOG.info("Removed ProcessTree with root " + pId); } + + ContainerImpl container = + (ContainerImpl) context.getContainers().get(containerId); + NMTimelinePublisher nmMetricsPublisher = + container.getNMTimelinePublisher(); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.reportContainerResourceUsage(container, + currentPmemUsage, cpuUsagePercentPerCore); + } } catch (Exception e) { // Log the exception and proceed to the next container. - LOG.warn("Uncaught exception in ContainerMemoryManager " - + "while managing memory of " + containerId, e); + LOG.warn("Uncaught exception in ContainersMonitorImpl " + + "while monitoring resource of " + containerId, e); } } if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java new file mode 100644 index 0000000..f275b37 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +/** + * Event posted to NMTimelinePublisher which in turn publishes it to + * timelineservice v2. + */ +public class NMTimelineEvent extends AbstractEvent<NMTimelineEventType> { + public NMTimelineEvent(NMTimelineEventType type) { + super(type); + } + + public NMTimelineEvent(NMTimelineEventType type, long timestamp) { + super(type, timestamp); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java new file mode 100644 index 0000000..b4ae45a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +/** + * Type of {@link NMTimelineEvent}. + */ +public enum NMTimelineEventType { + // Publish the NM Timeline entity + TIMELINE_ENTITY_PUBLISH, +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java new file mode 100644 index 0000000..3f07a34 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -0,0 +1,405 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Metrics publisher service that publishes data to the timeline service v.2. It + * is used only if the timeline service v.2 is enabled and the system publishing + * of events and metrics is enabled. + */ +public class NMTimelinePublisher extends CompositeService { + + private static final Log LOG = LogFactory.getLog(NMTimelinePublisher.class); + + private Dispatcher dispatcher; + + private Context context; + + private NodeId nodeId; + + private String httpAddress; + + private final Map<ApplicationId, TimelineClient> appToClientMap; + + public NMTimelinePublisher(Context context) { + super(NMTimelinePublisher.class.getName()); + this.context = context; + appToClientMap = new ConcurrentHashMap<>(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + dispatcher = new AsyncDispatcher(); + dispatcher.register(NMTimelineEventType.class, + new ForwardingEventHandler()); + addIfService(dispatcher); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + // context will be updated after containerManagerImpl is started + // hence NMMetricsPublisher is added subservice of containerManagerImpl + this.nodeId = context.getNodeId(); + this.httpAddress = nodeId.getHost() + ":" + context.getHttpPort(); + } + + @VisibleForTesting + Map<ApplicationId, TimelineClient> getAppToClientMap() { + return appToClientMap; + } + + protected void handleNMTimelineEvent(NMTimelineEvent event) { + switch (event.getType()) { + case TIMELINE_ENTITY_PUBLISH: + putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(), + ((TimelinePublishEvent) event).getApplicationId()); + break; + default: + LOG.error("Unknown NMTimelineEvent type: " + event.getType()); + } + } + + public void reportContainerResourceUsage(Container container, Long pmemUsage, + Float cpuUsagePercentPerCore) { + if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || + cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) { + ContainerEntity entity = + createContainerEntity(container.getContainerId()); + long currentTimeMillis = System.currentTimeMillis(); + if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) { + TimelineMetric memoryMetric = new TimelineMetric(); + memoryMetric.setId(ContainerMetric.MEMORY.toString()); + memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); + memoryMetric.addValue(currentTimeMillis, pmemUsage); + entity.addMetric(memoryMetric); + } + if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) { + TimelineMetric cpuMetric = new TimelineMetric(); + cpuMetric.setId(ContainerMetric.CPU.toString()); + // TODO: support average + cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); + cpuMetric.addValue(currentTimeMillis, + Math.round(cpuUsagePercentPerCore)); + entity.addMetric(cpuMetric); + } + ApplicationId appId = container.getContainerId().getApplicationAttemptId() + .getApplicationId(); + try { + // no need to put it as part of publisher as timeline client already has + // Queuing concept + TimelineClient timelineClient = getTimelineClient(appId); + if (timelineClient != null) { + timelineClient.putEntitiesAsync(entity); + } else { + LOG.error("Seems like client has been removed before the container" + + " metric could be published for " + container.getContainerId()); + } + } catch (IOException | YarnException e) { + LOG.error("Failed to publish Container metrics for container " + + container.getContainerId(), e); + } + } + } + + @SuppressWarnings("unchecked") + private void publishContainerCreatedEvent(ContainerEvent event) { + ContainerId containerId = event.getContainerID(); + ContainerEntity entity = createContainerEntity(containerId); + Container container = context.getContainers().get(containerId); + Resource resource = container.getResource(); + + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + resource.getMemory()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, + resource.getVirtualCores()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + nodeId.getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + nodeId.getPort()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + container.getPriority().toString()); + entityInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + httpAddress); + entity.setInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + entity.addEvent(tEvent); + entity.setCreatedTime(event.getTimestamp()); + dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, + containerId.getApplicationAttemptId().getApplicationId())); + } + + @SuppressWarnings("unchecked") + private void publishContainerFinishedEvent(ContainerStatus containerStatus, + long timeStamp) { + ContainerId containerId = containerStatus.getContainerId(); + TimelineEntity entity = createContainerEntity(containerId); + + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + containerStatus.getDiagnostics()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + containerStatus.getExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, containerStatus + .getState().toString()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(timeStamp); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + + dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, + containerId.getApplicationAttemptId().getApplicationId())); + } + + private void publishContainerLocalizationEvent( + ContainerLocalizationEvent event, String eventType) { + Container container = event.getContainer(); + ContainerId containerId = container.getContainerId(); + TimelineEntity entity = createContainerEntity(containerId); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(eventType); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + + ApplicationId appId = + container.getContainerId().getApplicationAttemptId().getApplicationId(); + try { + // no need to put it as part of publisher as timeline client already has + // Queuing concept + TimelineClient timelineClient = getTimelineClient(appId); + if (timelineClient != null) { + timelineClient.putEntitiesAsync(entity); + } else { + LOG.error("Seems like client has been removed before the event could be" + + " published for " + container.getContainerId()); + } + } catch (IOException | YarnException e) { + LOG.error("Failed to publish Container metrics for container " + + container.getContainerId(), e); + } + } + + private static ContainerEntity createContainerEntity( + ContainerId containerId) { + ContainerEntity entity = new ContainerEntity(); + entity.setId(containerId.toString()); + Identifier parentIdentifier = new Identifier(); + parentIdentifier + .setType(TimelineEntityType.YARN_APPLICATION_ATTEMPT.name()); + parentIdentifier.setId(containerId.getApplicationAttemptId().toString()); + entity.setParent(parentIdentifier); + return entity; + } + + private void putEntity(TimelineEntity entity, ApplicationId appId) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + TimelineClient timelineClient = getTimelineClient(appId); + if (timelineClient != null) { + timelineClient.putEntities(entity); + } else { + LOG.error("Seems like client has been removed before the entity " + + "could be published for " + entity); + } + } catch (Exception e) { + LOG.error("Error when publishing entity " + entity, e); + } + } + + public void publishApplicationEvent(ApplicationEvent event) { + // publish only when the desired event is received + switch (event.getType()) { + case INIT_APPLICATION: + case FINISH_APPLICATION: + case APPLICATION_LOG_HANDLING_FAILED: + // TODO need to be handled in future, + // not sure to publish under which entity + break; + case APPLICATION_CONTAINER_FINISHED: + // this is actually used to publish the container Event + ApplicationContainerFinishedEvent evnt = + (ApplicationContainerFinishedEvent) event; + publishContainerFinishedEvent(evnt.getContainerStatus(), + event.getTimestamp()); + break; + + default: + if (LOG.isDebugEnabled()) { + LOG.debug(event.getType() + " is not a desired ApplicationEvent which" + + " needs to be published by NMTimelinePublisher"); + } + break; + } + } + + public void publishContainerEvent(ContainerEvent event) { + // publish only when the desired event is received + switch (event.getType()) { + case INIT_CONTAINER: + publishContainerCreatedEvent(event); + break; + + default: + if (LOG.isDebugEnabled()) { + LOG.debug(event.getType() + + " is not a desired ContainerEvent which needs to be published by" + + " NMTimelinePublisher"); + } + break; + } + } + + public void publishLocalizationEvent(LocalizationEvent event) { + // publish only when the desired event is received + switch (event.getType()) { + case CONTAINER_RESOURCES_LOCALIZED: + publishContainerLocalizationEvent((ContainerLocalizationEvent) event, + ContainerMetricsConstants.LOCALIZATION_FINISHED_EVENT_TYPE); + break; + case LOCALIZE_CONTAINER_RESOURCES: + publishContainerLocalizationEvent((ContainerLocalizationEvent) event, + ContainerMetricsConstants.LOCALIZATION_START_EVENT_TYPE); + break; + default: + if (LOG.isDebugEnabled()) { + LOG.debug(event.getType() + + " is not a desired LocalizationEvent which needs to be published" + + " by NMTimelinePublisher"); + } + break; + } + } + + /** + * EventHandler implementation which forward events to NMMetricsPublisher. + * Making use of it, NMMetricsPublisher can avoid to have a public handle + * method. + */ + private final class ForwardingEventHandler implements + EventHandler<NMTimelineEvent> { + + @Override + public void handle(NMTimelineEvent event) { + handleNMTimelineEvent(event); + } + } + + private static class TimelinePublishEvent extends NMTimelineEvent { + private ApplicationId appId; + private TimelineEntity entityToPublish; + + public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) { + super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System + .currentTimeMillis()); + this.appId = appId; + this.entityToPublish = entity; + } + + public ApplicationId getApplicationId() { + return appId; + } + + public TimelineEntity getTimelineEntityToPublish() { + return entityToPublish; + } + } + + public void createTimelineClient(ApplicationId appId) { + if (!appToClientMap.containsKey(appId)) { + TimelineClient timelineClient = + TimelineClient.createTimelineClient(appId); + timelineClient.init(getConfig()); + timelineClient.start(); + appToClientMap.put(appId, timelineClient); + } + } + + public void stopTimelineClient(ApplicationId appId) { + TimelineClient client = appToClientMap.remove(appId); + if (client != null) { + client.stop(); + } + } + + public void setTimelineServiceAddress(ApplicationId appId, + String collectorAddr) { + TimelineClient client = appToClientMap.get(appId); + if (client != null) { + client.setTimelineServiceAddress(collectorAddr); + } + } + + private TimelineClient getTimelineClient(ApplicationId appId) { + return appToClientMap.get(appId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/package-info.java new file mode 100644 index 0000000..66233fd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.nodemanager.timelineservice contains + * classes related to publishing container events and other NM lifecycle events + * to ATSv2. + */ [email protected] [email protected] +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index bc205b9..c20bb7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -1219,6 +1219,7 @@ public class TestNodeStatusUpdater { BuilderUtils.newContainerToken(containerId, 0, "host", 1234, "user", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); + Container completedContainer = new ContainerImpl(conf, null, null, null, null, BuilderUtils.newContainerTokenIdentifier(containerToken), http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 579bea9..57a9706 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.Records; import org.junit.After; @@ -618,6 +619,11 @@ public abstract class BaseAMRMProxyTest { } @Override + public Map<ApplicationId, String> getRegisteredCollectors() { + return null; + } + + @Override public ConcurrentMap<ContainerId, Container> getContainers() { return null; } @@ -709,5 +715,13 @@ public abstract class BaseAMRMProxyTest { public ContainerExecutor getContainerExecutor() { return null; } + + public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) { + } + + @Override + public NMTimelinePublisher getNMTimelinePublisher() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 2d2c294..db44369 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.isA; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -50,7 +50,6 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -95,12 +94,12 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; -import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Before; @@ -722,6 +721,12 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { boolean blockNewContainerRequests) { // do nothing } + + @Override + public NMTimelinePublisher + createNMTimelinePublisher(Context context) { + return null; + } }; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 157ba97..05ea036 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -34,7 +34,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -539,7 +541,8 @@ public class TestApplication { new ApplicationACLsManager(conf)); when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr); when(context.getNMStateStore()).thenReturn(stateStoreService); - + when(context.getConf()).thenReturn(conf); + // Setting master key MasterKey masterKey = new MasterKeyPBImpl(); masterKey.setKeyId(123); @@ -550,7 +553,8 @@ public class TestApplication { this.user = user; this.appId = BuilderUtils.newApplicationId(timestamp, id); - app = new ApplicationImpl(dispatcher, this.user, appId, null, context); + app = new ApplicationImpl( + dispatcher, this.user, appId, null, context); containers = new ArrayList<Container>(); for (int i = 0; i < numContainers; i++) { Container container = createMockedContainer(this.appId, i); @@ -597,7 +601,7 @@ public class TestApplication { public void containerFinished(int containerNum) { app.handle(new ApplicationContainerFinishedEvent(containers.get( - containerNum).getContainerId())); + containerNum).cloneAndGetContainerStatus())); drainDispatcherEvents(); } @@ -641,6 +645,9 @@ public class TestApplication { when(c.getLaunchContext()).thenReturn(launchContext); when(launchContext.getApplicationACLs()).thenReturn( new HashMap<ApplicationAccessType, String>()); + when(c.cloneAndGetContainerStatus()).thenReturn( + BuilderUtils.newContainerStatus(cId, + ContainerState.NEW, "", 0, Resource.newInstance(1024, 1))); return c; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java index bbbaf78..79a8f33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java @@ -99,7 +99,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.hamcrest.CoreMatchers; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index 9813a93..88d9688 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -431,4 +431,4 @@ public class TestAppLogAggregatorImpl { return spy(new LogWriter(conf, remoteAppLogFile, ugi)); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java new file mode 100644 index 0000000..ae9397a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -0,0 +1,157 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.junit.Assert; +import org.junit.Test; + +public class TestNMTimelinePublisher { + private static final String MEMORY_ID = "MEMORY"; + private static final String CPU_ID = "CPU"; + + @Test + public void testContainerResourceUsage() { + Context context = mock(Context.class); + @SuppressWarnings("unchecked") + final DummyTimelineClient timelineClient = new DummyTimelineClient(); + when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); + when(context.getHttpPort()).thenReturn(0); + NMTimelinePublisher publisher = new NMTimelinePublisher(context) { + public void createTimelineClient(ApplicationId appId) { + if (!getAppToClientMap().containsKey(appId)) { + getAppToClientMap().put(appId, timelineClient); + } + } + }; + publisher.init(new Configuration()); + publisher.start(); + ApplicationId appId = ApplicationId.newInstance(0, 1); + publisher.createTimelineClient(appId); + Container aContainer = mock(Container.class); + when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId( + ApplicationAttemptId.newInstance(appId, 1), + 0L)); + publisher.reportContainerResourceUsage(aContainer, 1024L, 8F); + verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8); + timelineClient.reset(); + + publisher.reportContainerResourceUsage(aContainer, 1024L, 0.8F); + verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 1); + timelineClient.reset(); + + publisher.reportContainerResourceUsage(aContainer, 1024L, 0.49F); + verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 0); + timelineClient.reset(); + + publisher.reportContainerResourceUsage(aContainer, 1024L, + (float) ResourceCalculatorProcessTree.UNAVAILABLE); + verifyPublishedResourceUsageMetrics(timelineClient, 1024L, + ResourceCalculatorProcessTree.UNAVAILABLE); + publisher.stop(); + } + + private void verifyPublishedResourceUsageMetrics( + DummyTimelineClient timelineClient, long memoryUsage, int cpuUsage) { + TimelineEntity[] entities = null; + for (int i = 0; i < 10; i++) { + entities = timelineClient.getLastPublishedEntities(); + if (entities != null) { + break; + } + try { + Thread.sleep(150L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + int numberOfResourceMetrics = 0; + numberOfResourceMetrics += + (memoryUsage == ResourceCalculatorProcessTree.UNAVAILABLE) ? 0 : 1; + numberOfResourceMetrics += + (cpuUsage == ResourceCalculatorProcessTree.UNAVAILABLE) ? 0 : 1; + assertNotNull("entities are expected to be published", entities); + assertEquals("Expected number of metrics notpublished", + numberOfResourceMetrics, entities[0].getMetrics().size()); + Iterator<TimelineMetric> metrics = entities[0].getMetrics().iterator(); + while (metrics.hasNext()) { + TimelineMetric metric = metrics.next(); + Iterator<Entry<Long, Number>> entrySet; + switch (metric.getId()) { + case CPU_ID: + if (cpuUsage == ResourceCalculatorProcessTree.UNAVAILABLE) { + Assert.fail("Not Expecting CPU Metric to be published"); + } + entrySet = metric.getValues().entrySet().iterator(); + assertEquals("CPU usage metric not matching", cpuUsage, + entrySet.next().getValue()); + break; + case MEMORY_ID: + if (memoryUsage == ResourceCalculatorProcessTree.UNAVAILABLE) { + Assert.fail("Not Expecting Memory Metric to be published"); + } + entrySet = metric.getValues().entrySet().iterator(); + assertEquals("Memory usage metric not matching", memoryUsage, + entrySet.next().getValue()); + break; + default: + Assert.fail("Invalid Resource Usage metric"); + break; + } + } + } + + protected static class DummyTimelineClient extends TimelineClientImpl { + private TimelineEntity[] lastPublishedEntities; + + @Override + public void putEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException { + this.lastPublishedEntities = entities; + } + + public TimelineEntity[] getLastPublishedEntities() { + return lastPublishedEntities; + } + + public void reset() { + lastPublishedEntities = null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java index 4e13010..8feca21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java @@ -39,6 +39,9 @@ public class MockApp implements Application { Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>(); ApplicationState appState; Application app; + private String flowName; + private String flowVersion; + private long flowRunId; public MockApp(int uniqId) { this("mockUser", 1234, uniqId); @@ -55,6 +58,14 @@ public class MockApp implements Application { appState = ApplicationState.NEW; } + public MockApp(String user, long clusterTimeStamp, int uniqId, + String flowName, String flowVersion, long flowRunId) { + this(user, clusterTimeStamp, uniqId); + this.flowName = flowName; + this.flowVersion = flowVersion; + this.flowRunId = flowRunId; + } + public void setState(ApplicationState state) { this.appState = state; } @@ -77,4 +88,15 @@ public class MockApp implements Application { public void handle(ApplicationEvent event) {} + public String getFlowName() { + return flowName; + } + + public String getFlowVersion() { + return flowVersion; + } + + public long getFlowRunId() { + return flowRunId; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 5f1aab9..242a674 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -207,6 +208,9 @@ public class MockContainer implements Container { @Override public void commitUpgrade() { + } + public Priority getPriority() { + return Priority.UNDEFINED; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java index 33a821e..d8b40e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java @@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp; import static org.junit.Assume.assumeTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.BufferedOutputStream; import java.io.File; @@ -47,10 +47,9 @@ import org.apache.hadoop.util.NodeHealthScriptRunner; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -63,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index 5353c98..be1dae1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -86,8 +86,9 @@ public class TestNMWebServer { } private int startNMWebAppServer(String webAddr) { + Configuration conf = new Configuration(); Context nmContext = new NodeManager.NMContext(null, null, null, null, - null, false, null); + null, false, conf); ResourceView resourceView = new ResourceView() { @Override public long getVmemAllocatedForContainers() { @@ -110,7 +111,6 @@ public class TestNMWebServer { return true; } }; - Configuration conf = new Configuration(); conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath()); NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf); @@ -149,8 +149,9 @@ public class TestNMWebServer { @Test public void testNMWebApp() throws IOException, YarnException { + Configuration conf = new Configuration(); Context nmContext = new NodeManager.NMContext(null, null, null, null, - null, false, null); + null, false, conf); ResourceView resourceView = new ResourceView() { @Override public long getVmemAllocatedForContainers() { @@ -173,7 +174,6 @@ public class TestNMWebServer { return true; } }; - Configuration conf = new Configuration(); conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath()); NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index ff0e0b0..06221e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -158,6 +158,12 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> @@ -172,6 +178,10 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-web-proxy</artifactId> </dependency> <dependency> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
