http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java deleted file mode 100644 index 05b6781..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; - -public class ContainerCreatedEvent extends SystemMetricsEvent { - - private ContainerId containerId; - private Resource allocatedResource; - private NodeId allocatedNode; - private Priority allocatedPriority; - private String nodeHttpAddress; - - public ContainerCreatedEvent( - ContainerId containerId, - Resource allocatedResource, - NodeId allocatedNode, - Priority allocatedPriority, - long createdTime, - String nodeHttpAddress) { - super(SystemMetricsEventType.CONTAINER_CREATED, createdTime); - this.containerId = containerId; - this.allocatedResource = allocatedResource; - this.allocatedNode = allocatedNode; - this.allocatedPriority = allocatedPriority; - this.nodeHttpAddress = nodeHttpAddress; - } - - @Override - public int hashCode() { - return containerId.getApplicationAttemptId().getApplicationId().hashCode(); - } - - public ContainerId getContainerId() { - return containerId; - } - - public Resource getAllocatedResource() { - return allocatedResource; - } - - public NodeId getAllocatedNode() { - return allocatedNode; - } - - public Priority getAllocatedPriority() { - return allocatedPriority; - } - - public String getNodeHttpAddress() { - return nodeHttpAddress; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java deleted file mode 100644 index ca4d311..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.NodeId; - -public class ContainerFinishedEvent extends SystemMetricsEvent { - - private ContainerId containerId; - private String diagnosticsInfo; - private int containerExitStatus; - private ContainerState state; - private NodeId allocatedNode; - - public ContainerFinishedEvent( - ContainerId containerId, - String diagnosticsInfo, - int containerExitStatus, - ContainerState state, - long finishedTime, - NodeId allocatedNode) { - super(SystemMetricsEventType.CONTAINER_FINISHED, finishedTime); - this.containerId = containerId; - this.diagnosticsInfo = diagnosticsInfo; - this.containerExitStatus = containerExitStatus; - this.allocatedNode = allocatedNode; - this.state = state; - } - - @Override - public int hashCode() { - return containerId.getApplicationAttemptId().getApplicationId().hashCode(); - } - - public ContainerId getContainerId() { - return containerId; - } - - public String getDiagnosticsInfo() { - return diagnosticsInfo; - } - - public int getContainerExitStatus() { - return containerExitStatus; - } - - public ContainerState getContainerState() { - return state; - } - - public NodeId getAllocatedNode() { - return allocatedNode; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java new file mode 100644 index 0000000..e0bf3c7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +/** + * This class does nothing when any of the methods are invoked on + * SystemMetricsPublisher. + */ +public class NoOpSystemMetricPublisher implements SystemMetricsPublisher{ + + @Override + public void appCreated(RMApp app, long createdTime) { + } + + @Override + public void appFinished(RMApp app, RMAppState state, long finishedTime) { + } + + @Override + public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { + } + + @Override + public void appAttemptRegistered(RMAppAttempt appAttempt, + long registeredTime) { + } + + @Override + public void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + } + + @Override + public void containerCreated(RMContainer container, long createdTime) { + } + + @Override + public void containerFinished(RMContainer container, long finishedTime) { + } + + @Override + public void appUpdated(RMApp app, long currentTimeMillis) { + } + + @Override + public void appStateUpdated(RMApp app, YarnApplicationState appState, + long updatedTime) { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java deleted file mode 100644 index 1847396..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.event.AbstractEvent; - -public class SystemMetricsEvent extends AbstractEvent<SystemMetricsEventType> { - - public SystemMetricsEvent(SystemMetricsEventType type) { - super(type); - } - - public SystemMetricsEvent(SystemMetricsEventType type, long timestamp) { - super(type, timestamp); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java deleted file mode 100644 index fcda4b4..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - - -public enum SystemMetricsEventType { - // app events - APP_CREATED, - APP_FINISHED, - APP_ACLS_UPDATED, - APP_UPDATED, - APP_STATE_UPDATED, - - // app attempt events - APP_ATTEMPT_REGISTERED, - APP_ATTEMPT_FINISHED, - - // container events - CONTAINER_CREATED, - CONTAINER_FINISHED -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java index b0b976d..eeea434 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java @@ -18,615 +18,35 @@ package org.apache.hadoop.yarn.server.resourcemanager.metrics; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.client.api.TimelineClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; - -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; /** - * The class that helps RM publish metrics to the timeline server. RM will - * always invoke the methods of this class regardless the service is enabled or - * not. If it is disabled, publishing requests will be ignored silently. + * Interface used to publish app/container events to timelineservice. */ -@Private -@Unstable -public class SystemMetricsPublisher extends CompositeService { - - private static final Log LOG = LogFactory - .getLog(SystemMetricsPublisher.class); - - private Dispatcher dispatcher; - private TimelineClient client; - private boolean publishSystemMetrics; - - public SystemMetricsPublisher() { - super(SystemMetricsPublisher.class.getName()); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - publishSystemMetrics = - conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) && - conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED); - - if (publishSystemMetrics) { - client = TimelineClient.createTimelineClient(); - addIfService(client); - - dispatcher = createDispatcher(conf); - dispatcher.register(SystemMetricsEventType.class, - new ForwardingEventHandler()); - addIfService(dispatcher); - LOG.info("YARN system metrics publishing service is enabled"); - } else { - LOG.info("YARN system metrics publishing service is not enabled"); - } - super.serviceInit(conf); - } - - @SuppressWarnings("unchecked") - public void appCreated(RMApp app, long createdTime) { - if (publishSystemMetrics) { - ApplicationSubmissionContext appSubmissionContext = - app.getApplicationSubmissionContext(); - dispatcher.getEventHandler().handle( - new ApplicationCreatedEvent( - app.getApplicationId(), - app.getName(), - app.getApplicationType(), - app.getUser(), - app.getQueue(), - app.getSubmitTime(), - createdTime, app.getApplicationTags(), - appSubmissionContext.getUnmanagedAM(), - appSubmissionContext.getPriority(), - app.getAppNodeLabelExpression(), - app.getAmNodeLabelExpression(), - app.getCallerContext())); - } - } - - @SuppressWarnings("unchecked") - public void appUpdated(RMApp app, long updatedTime) { - if (publishSystemMetrics) { - dispatcher.getEventHandler() - .handle(new ApplicationUpdatedEvent(app.getApplicationId(), - app.getQueue(), updatedTime, - app.getApplicationSubmissionContext().getPriority())); - } - } - - @SuppressWarnings("unchecked") - public void appFinished(RMApp app, RMAppState state, long finishedTime) { - if (publishSystemMetrics) { - dispatcher.getEventHandler().handle( - new ApplicationFinishedEvent( - app.getApplicationId(), - app.getDiagnostics().toString(), - app.getFinalApplicationStatus(), - RMServerUtils.createApplicationState(state), - app.getCurrentAppAttempt() == null ? - null : app.getCurrentAppAttempt().getAppAttemptId(), - finishedTime, - app.getRMAppMetrics())); - } - } - - @SuppressWarnings("unchecked") - public void appACLsUpdated(RMApp app, String appViewACLs, - long updatedTime) { - if (publishSystemMetrics) { - dispatcher.getEventHandler().handle( - new ApplicationACLsUpdatedEvent( - app.getApplicationId(), - appViewACLs == null ? "" : appViewACLs, - updatedTime)); - } - } - - @SuppressWarnings("unchecked") - public void appStateUpdated(RMApp app, YarnApplicationState appState, - long updatedTime) { - if (publishSystemMetrics) { - dispatcher.getEventHandler().handle( - new ApplicaitonStateUpdatedEvent( - app.getApplicationId(), - appState, - updatedTime)); - } - } - - @SuppressWarnings("unchecked") - public void appAttemptRegistered(RMAppAttempt appAttempt, - long registeredTime) { - if (publishSystemMetrics) { - ContainerId container = (appAttempt.getMasterContainer() == null) ? null - : appAttempt.getMasterContainer().getId(); - dispatcher.getEventHandler().handle( - new AppAttemptRegisteredEvent( - appAttempt.getAppAttemptId(), - appAttempt.getHost(), - appAttempt.getRpcPort(), - appAttempt.getTrackingUrl(), - appAttempt.getOriginalTrackingUrl(), - container, - registeredTime)); - } - } - - @SuppressWarnings("unchecked") - public void appAttemptFinished(RMAppAttempt appAttempt, - RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { - if (publishSystemMetrics) { - ContainerId container = (appAttempt.getMasterContainer() == null) ? null - : appAttempt.getMasterContainer().getId(); - dispatcher.getEventHandler().handle( - new AppAttemptFinishedEvent( - appAttempt.getAppAttemptId(), - appAttempt.getTrackingUrl(), - appAttempt.getOriginalTrackingUrl(), - appAttempt.getDiagnostics(), - // app will get the final status from app attempt, or create one - // based on app state if it doesn't exist - app.getFinalApplicationStatus(), - RMServerUtils.createApplicationAttemptState(appAttemtpState), - finishedTime, - container)); - } - } - - @SuppressWarnings("unchecked") - public void containerCreated(RMContainer container, long createdTime) { - if (publishSystemMetrics) { - dispatcher.getEventHandler().handle( - new ContainerCreatedEvent( - container.getContainerId(), - container.getAllocatedResource(), - container.getAllocatedNode(), - container.getAllocatedPriority(), - createdTime, container.getNodeHttpAddress())); - } - } - - @SuppressWarnings("unchecked") - public void containerFinished(RMContainer container, long finishedTime) { - if (publishSystemMetrics) { - dispatcher.getEventHandler().handle( - new ContainerFinishedEvent( - container.getContainerId(), - container.getDiagnosticsInfo(), - container.getContainerExitStatus(), - container.getContainerState(), - finishedTime, container.getAllocatedNode())); - } - } - - protected Dispatcher createDispatcher(Configuration conf) { - MultiThreadedDispatcher dispatcher = - new MultiThreadedDispatcher( - conf.getInt( - YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, - YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); - dispatcher.setDrainEventsOnStop(); - return dispatcher; - } - - protected void handleSystemMetricsEvent( - SystemMetricsEvent event) { - switch (event.getType()) { - case APP_CREATED: - publishApplicationCreatedEvent((ApplicationCreatedEvent) event); - break; - case APP_FINISHED: - publishApplicationFinishedEvent((ApplicationFinishedEvent) event); - break; - case APP_ACLS_UPDATED: - publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); - break; - case APP_UPDATED: - publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event); - break; - case APP_STATE_UPDATED: - publishApplicationStateUpdatedEvent( - (ApplicaitonStateUpdatedEvent)event); - break; - case APP_ATTEMPT_REGISTERED: - publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); - break; - case APP_ATTEMPT_FINISHED: - publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); - break; - case CONTAINER_CREATED: - publishContainerCreatedEvent((ContainerCreatedEvent) event); - break; - case CONTAINER_FINISHED: - publishContainerFinishedEvent((ContainerFinishedEvent) event); - break; - default: - LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); - } - } - - private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - Map<String, Object> entityInfo = new HashMap<String, Object>(); - entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, - event.getApplicationName()); - entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, - event.getApplicationType()); - entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, - event.getUser()); - entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); - entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, - event.getSubmittedTime()); - entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, - event.getAppTags()); - entityInfo.put( - ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, - event.isUnmanagedApp()); - entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, - event.getApplicationPriority().getPriority()); - entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, - event.getAppNodeLabelsExpression()); - entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, - event.getAmNodeLabelsExpression()); - if (event.getCallerContext() != null) { - if (event.getCallerContext().getContext() != null) { - entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT, - event.getCallerContext().getContext()); - } - if (event.getCallerContext().getSignature() != null) { - entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE, - event.getCallerContext().getSignature()); - } - } - entity.setOtherInfo(entityInfo); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - ApplicationMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map<String, Object> eventInfo = new HashMap<String, Object>(); - eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, - event.getFinalApplicationStatus().toString()); - eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, - event.getYarnApplicationState().toString()); - if (event.getLatestApplicationAttemptId() != null) { - eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, - event.getLatestApplicationAttemptId().toString()); - } - RMAppMetrics appMetrics = event.getAppMetrics(); - entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS, - appMetrics.getVcoreSeconds()); - entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS, - appMetrics.getMemorySeconds()); - - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { - TimelineEntity entity = createApplicationEntity(event.getApplicationId()); - Map<String, Object> eventInfo = new HashMap<String, Object>(); - eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); - eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event - .getApplicationPriority().getPriority()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationStateUpdatedEvent( - ApplicaitonStateUpdatedEvent event) { - TimelineEntity entity = createApplicationEntity(event.getApplicationId()); - Map<String, Object> eventInfo = new HashMap<String, Object>(); - eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, - event.getAppState()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationACLsUpdatedEvent( - ApplicationACLsUpdatedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - TimelineEvent tEvent = new TimelineEvent(); - Map<String, Object> entityInfo = new HashMap<String, Object>(); - entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, - event.getViewAppACLs()); - entity.setOtherInfo(entityInfo); - tEvent.setEventType( - ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createApplicationEntity( - ApplicationId applicationId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); - entity.setEntityId(applicationId.toString()); - return entity; - } - - private void - publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { - TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map<String, Object> eventInfo = new HashMap<String, Object>(); - eventInfo.put( - AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); - eventInfo.put( - AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); - eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, - event.getHost()); - eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, - event.getRpcPort()); - if (event.getMasterContainerId() != null) { - eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, - event.getMasterContainerId().toString()); - } - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { - TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map<String, Object> eventInfo = new HashMap<String, Object>(); - eventInfo.put( - AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); - eventInfo.put( - AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); - eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, - event.getFinalApplicationStatus().toString()); - eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, - event.getYarnApplicationAttemptState().toString()); - if (event.getMasterContainerId() != null) { - eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, - event.getMasterContainerId().toString()); - } - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createAppAttemptEntity( - ApplicationAttemptId appAttemptId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType( - AppAttemptMetricsConstants.ENTITY_TYPE); - entity.setEntityId(appAttemptId.toString()); - entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, - appAttemptId.getApplicationId().toString()); - return entity; - } - - private void publishContainerCreatedEvent(ContainerCreatedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - Map<String, Object> entityInfo = new HashMap<String, Object>(); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, - event.getAllocatedResource().getMemorySize()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, - event.getAllocatedResource().getVirtualCores()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, - event.getAllocatedNode().getHost()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, - event.getAllocatedNode().getPort()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, - event.getAllocatedPriority().getPriority()); - entityInfo.put( - ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, - event.getNodeHttpAddress()); - entity.setOtherInfo(entityInfo); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishContainerFinishedEvent(ContainerFinishedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map<String, Object> eventInfo = new HashMap<String, Object>(); - eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, - event.getContainerExitStatus()); - eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, - event.getContainerState().toString()); - Map<String, Object> entityInfo = new HashMap<String, Object>(); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, - event.getAllocatedNode().getHost()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, - event.getAllocatedNode().getPort()); - entity.setOtherInfo(entityInfo); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createContainerEntity( - ContainerId containerId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType( - ContainerMetricsConstants.ENTITY_TYPE); - entity.setEntityId(containerId.toString()); - entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, - containerId.getApplicationAttemptId().toString()); - return entity; - } - - @Private - @VisibleForTesting - public void putEntity(TimelineEntity entity) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Publishing the entity " + entity.getEntityId() + - ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); - } - TimelinePutResponse response = client.putEntities(entity); - List<TimelinePutResponse.TimelinePutError> errors = response.getErrors(); - if (errors.size() == 0) { - LOG.debug("Timeline entities are successfully put"); - } else { - for (TimelinePutResponse.TimelinePutError error : errors) { - LOG.error( - "Error when publishing entity [" + error.getEntityType() + "," - + error.getEntityId() + "], server side error code: " - + error.getErrorCode()); - } - } - } catch (Exception e) { - LOG.error("Error when publishing entity [" + entity.getEntityType() + "," - + entity.getEntityId() + "]", e); - } - } - - /** - * EventHandler implementation which forward events to SystemMetricsPublisher. - * Making use of it, SystemMetricsPublisher can avoid to have a public handle - * method. - */ - private final class ForwardingEventHandler implements - EventHandler<SystemMetricsEvent> { - - @Override - public void handle(SystemMetricsEvent event) { - handleSystemMetricsEvent(event); - } - - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - protected static class MultiThreadedDispatcher extends CompositeService - implements Dispatcher { - - private List<AsyncDispatcher> dispatchers = - new ArrayList<AsyncDispatcher>(); - - public MultiThreadedDispatcher(int num) { - super(MultiThreadedDispatcher.class.getName()); - for (int i = 0; i < num; ++i) { - AsyncDispatcher dispatcher = createDispatcher(); - dispatchers.add(dispatcher); - addIfService(dispatcher); - } - } +public interface SystemMetricsPublisher { - @Override - public EventHandler getEventHandler() { - return new CompositEventHandler(); - } + void appCreated(RMApp app, long createdTime); - @Override - public void register(Class<? extends Enum> eventType, EventHandler handler) { - for (AsyncDispatcher dispatcher : dispatchers) { - dispatcher.register(eventType, handler); - } - } + void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime); - public void setDrainEventsOnStop() { - for (AsyncDispatcher dispatcher : dispatchers) { - dispatcher.setDrainEventsOnStop(); - } - } + void appUpdated(RMApp app, long updatedTime); - private class CompositEventHandler implements EventHandler<Event> { + void appStateUpdated(RMApp app, YarnApplicationState appState, + long updatedTime); - @Override - public void handle(Event event) { - // Use hashCode (of ApplicationId) to dispatch the event to the child - // dispatcher, such that all the writing events of one application will - // be handled by one thread, the scheduled order of the these events - // will be preserved - int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size(); - dispatchers.get(index).getEventHandler().handle(event); - } + void appFinished(RMApp app, RMAppState state, long finishedTime); - } + void appAttemptRegistered(RMAppAttempt appAttempt, long registeredTime); - protected AsyncDispatcher createDispatcher() { - return new AsyncDispatcher(); - } + void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState appAttemtpState, RMApp app, long finishedTime); - } + void containerCreated(RMContainer container, long createdTime); + void containerFinished(RMContainer container, long finishedTime); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java new file mode 100644 index 0000000..7f4ed33 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java @@ -0,0 +1,386 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +/** + * This class is responsible for posting application, appattempt & Container + * lifecycle related events to timeline service v1. + */ +public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher { + + private static final Log LOG = + LogFactory.getLog(TimelineServiceV1Publisher.class); + + public TimelineServiceV1Publisher() { + super("TimelineserviceV1Publisher"); + } + + private TimelineClient client; + + @Override + protected void serviceInit(Configuration conf) throws Exception { + client = TimelineClient.createTimelineClient(); + addIfService(client); + super.serviceInit(conf); + getDispatcher().register(SystemMetricsEventType.class, + new TimelineV1EventHandler()); + } + + @SuppressWarnings("unchecked") + @Override + public void appCreated(RMApp app, long createdTime) { + TimelineEntity entity = createApplicationEntity(app.getApplicationId()); + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName()); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + app.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser()); + entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + app.getQueue()); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + app.getSubmitTime()); + entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, + app.getApplicationTags()); + entityInfo.put( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, + app.getApplicationSubmissionContext().getUnmanagedAM()); + entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + app.getApplicationSubmissionContext().getPriority().getPriority()); + entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, + app.getAmNodeLabelExpression()); + entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, + app.getAppNodeLabelExpression()); + if (app.getCallerContext() != null) { + if (app.getCallerContext().getContext() != null) { + entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT, + app.getCallerContext().getContext()); + } + if (app.getCallerContext().getSignature() != null) { + entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE, + app.getCallerContext().getSignature()); + } + } + + entity.setOtherInfo(entityInfo); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(createdTime); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + @Override + public void appFinished(RMApp app, RMAppState state, long finishedTime) { + TimelineEntity entity = createApplicationEntity(app.getApplicationId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(finishedTime); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + app.getDiagnostics().toString()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, + app.getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + RMServerUtils.createApplicationState(state).toString()); + String latestApplicationAttemptId = app.getCurrentAppAttempt() == null + ? null : app.getCurrentAppAttempt().getAppAttemptId().toString(); + if (latestApplicationAttemptId != null) { + eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, + latestApplicationAttemptId); + } + RMAppMetrics appMetrics = app.getRMAppMetrics(); + entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS, + appMetrics.getVcoreSeconds()); + entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS, + appMetrics.getMemorySeconds()); + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + // sync sending of finish event to avoid possibility of saving application + // finished state in RMStateStore save without publishing in ATS. + putEntity(entity); // sync event so that ATS update is done without fail. + } + + @SuppressWarnings("unchecked") + @Override + public void appUpdated(RMApp app, long updatedTime) { + TimelineEntity entity = createApplicationEntity(app.getApplicationId()); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + app.getQueue()); + eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + app.getApplicationSubmissionContext().getPriority().getPriority()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); + tEvent.setTimestamp(updatedTime); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void appStateUpdated(RMApp app, YarnApplicationState appState, + long updatedTime) { + TimelineEntity entity = createApplicationEntity(app.getApplicationId()); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + appState); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(updatedTime); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { + TimelineEntity entity = createApplicationEntity(app.getApplicationId()); + TimelineEvent tEvent = new TimelineEvent(); + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, + (appViewACLs == null) ? "" : appViewACLs); + entity.setOtherInfo(entityInfo); + tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(updatedTime); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void appAttemptRegistered(RMAppAttempt appAttempt, + long registeredTime) { + TimelineEntity entity = + createAppAttemptEntity(appAttempt.getAppAttemptId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(registeredTime); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + appAttempt.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + appAttempt.getOriginalTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, + appAttempt.getHost()); + eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, + appAttempt.getRpcPort()); + if (appAttempt.getMasterContainer() != null) { + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + appAttempt.getMasterContainer().getId().toString()); + } + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle( + new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, + entity, appAttempt.getAppAttemptId().getApplicationId())); + + } + + @SuppressWarnings("unchecked") + @Override + public void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + TimelineEntity entity = + createAppAttemptEntity(appAttempt.getAppAttemptId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(finishedTime); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + appAttempt.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + appAttempt.getOriginalTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + appAttempt.getDiagnostics()); + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, + app.getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils + .createApplicationAttemptState(appAttemtpState).toString()); + if (appAttempt.getMasterContainer() != null) { + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + appAttempt.getMasterContainer().getId().toString()); + } + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle( + new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, + entity, appAttempt.getAppAttemptId().getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void containerCreated(RMContainer container, long createdTime) { + TimelineEntity entity = createContainerEntity(container.getContainerId()); + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + container.getAllocatedResource().getMemorySize()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, + container.getAllocatedResource().getVirtualCores()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + container.getAllocatedNode().getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + container.getAllocatedNode().getPort()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + container.getAllocatedPriority().getPriority()); + entityInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + container.getNodeHttpAddress()); + entity.setOtherInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(createdTime); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, container + .getContainerId().getApplicationAttemptId().getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void containerFinished(RMContainer container, long finishedTime) { + TimelineEntity entity = createContainerEntity(container.getContainerId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(finishedTime); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + container.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + container.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, + container.getContainerState().toString()); + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + container.getAllocatedNode().getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + container.getAllocatedNode().getPort()); + entity.setOtherInfo(entityInfo); + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, container + .getContainerId().getApplicationAttemptId().getApplicationId())); + } + + private static TimelineEntity createApplicationEntity( + ApplicationId applicationId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); + entity.setEntityId(applicationId.toString()); + return entity; + } + + private static TimelineEntity createAppAttemptEntity( + ApplicationAttemptId appAttemptId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE); + entity.setEntityId(appAttemptId.toString()); + entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, + appAttemptId.getApplicationId().toString()); + return entity; + } + + private static TimelineEntity createContainerEntity(ContainerId containerId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE); + entity.setEntityId(containerId.toString()); + entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, + containerId.getApplicationAttemptId().toString()); + return entity; + } + + private void putEntity(TimelineEntity entity) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity.getEntityId() + + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + client.putEntities(entity); + } catch (Exception e) { + LOG.error("Error when publishing entity [" + entity.getEntityType() + "," + + entity.getEntityId() + "]", e); + } + } + + private class TimelineV1PublishEvent extends TimelinePublishEvent { + private TimelineEntity entity; + + public TimelineV1PublishEvent(SystemMetricsEventType type, + TimelineEntity entity, ApplicationId appId) { + super(type, appId); + this.entity = entity; + } + + public TimelineEntity getEntity() { + return entity; + } + } + + private class TimelineV1EventHandler + implements EventHandler<TimelineV1PublishEvent> { + @Override + public void handle(TimelineV1PublishEvent event) { + putEntity(event.getEntity()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java new file mode 100644 index 0000000..a248199 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class is responsible for posting application, appattempt & Container + * lifecycle related events to timeline service v2. + */ +@Private +@Unstable +public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher { + private static final Log LOG = + LogFactory.getLog(TimelineServiceV2Publisher.class); + private RMTimelineCollectorManager rmTimelineCollectorManager; + private boolean publishContainerEvents; + + public TimelineServiceV2Publisher(RMContext rmContext) { + super("TimelineserviceV2Publisher"); + rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + getDispatcher().register(SystemMetricsEventType.class, + new TimelineV2EventHandler()); + publishContainerEvents = getConfig().getBoolean( + YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED, + YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_EVENTS_ENABLED); + } + + @VisibleForTesting + boolean isPublishContainerEvents() { + return publishContainerEvents; + } + + @SuppressWarnings("unchecked") + @Override + public void appCreated(RMApp app, long createdTime) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + entity.setQueue(app.getQueue()); + entity.setCreatedTime(createdTime); + + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName()); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + app.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser()); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + app.getSubmitTime()); + entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, + app.getApplicationTags()); + entityInfo.put( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, + app.getApplicationSubmissionContext().getUnmanagedAM()); + entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + app.getApplicationSubmissionContext().getPriority().getPriority()); + entity.getConfigs().put( + ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, + app.getAmNodeLabelExpression()); + entity.getConfigs().put( + ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, + app.getAppNodeLabelExpression()); + if (app.getCallerContext() != null) { + if (app.getCallerContext().getContext() != null) { + entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT, + app.getCallerContext().getContext()); + } + if (app.getCallerContext().getSignature() != null) { + entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE, + app.getCallerContext().getSignature()); + } + } + + entity.setInfo(entityInfo); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(createdTime); + entity.addEvent(tEvent); + + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void appFinished(RMApp app, RMAppState state, long finishedTime) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + RMAppMetrics appMetrics = app.getRMAppMetrics(); + entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS, + appMetrics.getVcoreSeconds()); + entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS, + appMetrics.getMemorySeconds()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(finishedTime); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + app.getDiagnostics().toString()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, + app.getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + RMServerUtils.createApplicationState(state).toString()); + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() == null + ? null : app.getCurrentAppAttempt().getAppAttemptId(); + if (appAttemptId != null) { + eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, + appAttemptId.toString()); + } + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void appStateUpdated(RMApp app, YarnApplicationState appState, + long updatedTime) { + ApplicationEntity entity = + createApplicationEntity(app.getApplicationId()); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + appState); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(updatedTime); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + TimelineEvent tEvent = new TimelineEvent(); + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, + (appViewACLs == null) ? "" : appViewACLs); + entity.setInfo(entityInfo); + tEvent.setId(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(updatedTime); + entity.addEvent(tEvent); + + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void appUpdated(RMApp app, long currentTimeMillis) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + app.getQueue()); + eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + app.getApplicationSubmissionContext().getPriority().getPriority()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); + tEvent.setTimestamp(currentTimeMillis); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + private static ApplicationEntity createApplicationEntity( + ApplicationId applicationId) { + ApplicationEntity entity = new ApplicationEntity(); + entity.setId(applicationId.toString()); + return entity; + } + + @SuppressWarnings("unchecked") + @Override + public void appAttemptRegistered(RMAppAttempt appAttempt, + long registeredTime) { + TimelineEntity entity = + createAppAttemptEntity(appAttempt.getAppAttemptId()); + entity.setCreatedTime(registeredTime); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(registeredTime); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + appAttempt.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + appAttempt.getOriginalTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, + appAttempt.getHost()); + eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, + appAttempt.getRpcPort()); + if (appAttempt.getMasterContainer() != null) { + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + appAttempt.getMasterContainer().getId().toString()); + } + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle( + new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, + entity, appAttempt.getAppAttemptId().getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + + ApplicationAttemptEntity entity = + createAppAttemptEntity(appAttempt.getAppAttemptId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(finishedTime); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + appAttempt.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + appAttempt.getOriginalTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + appAttempt.getDiagnostics()); + // app will get the final status from app attempt, or create one + // based on app state if it doesn't exist + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, + app.getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils + .createApplicationAttemptState(appAttemtpState).toString()); + if (appAttempt.getMasterContainer() != null) { + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + appAttempt.getMasterContainer().getId().toString()); + } + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle( + new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, + entity, appAttempt.getAppAttemptId().getApplicationId())); + } + + private static ApplicationAttemptEntity createAppAttemptEntity( + ApplicationAttemptId appAttemptId) { + ApplicationAttemptEntity entity = new ApplicationAttemptEntity(); + entity.setId(appAttemptId.toString()); + entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(), + appAttemptId.getApplicationId().toString())); + return entity; + } + + @SuppressWarnings("unchecked") + @Override + public void containerCreated(RMContainer container, long createdTime) { + if (publishContainerEvents) { + TimelineEntity entity = createContainerEntity(container.getContainerId()); + entity.setCreatedTime(createdTime); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(createdTime); + // updated as event info instead of entity info, as entity info is updated + // by NM + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + container.getAllocatedResource().getMemorySize()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, + container.getAllocatedResource().getVirtualCores()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + container.getAllocatedNode().getHost()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + container.getAllocatedNode().getPort()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + container.getAllocatedPriority().getPriority()); + eventInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + container.getNodeHttpAddress()); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, container + .getContainerId().getApplicationAttemptId().getApplicationId())); + } + } + + @SuppressWarnings("unchecked") + @Override + public void containerFinished(RMContainer container, long finishedTime) { + if (publishContainerEvents) { + TimelineEntity entity = createContainerEntity(container.getContainerId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(finishedTime); + Map<String, Object> eventInfo = new HashMap<String, Object>(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + container.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + container.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, + container.getContainerState().toString()); + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + container.getAllocatedNode().getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + container.getAllocatedNode().getPort()); + entity.setInfo(entityInfo); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, container + .getContainerId().getApplicationAttemptId().getApplicationId())); + } + } + + private static ContainerEntity createContainerEntity( + ContainerId containerId) { + ContainerEntity entity = new ContainerEntity(); + entity.setId(containerId.toString()); + entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION_ATTEMPT + .name(), containerId.getApplicationAttemptId().toString())); + return entity; + } + + private void putEntity(TimelineEntity entity, ApplicationId appId) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + TimelineCollector timelineCollector = + rmTimelineCollectorManager.get(appId); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + timelineCollector.putEntities(entities, + UserGroupInformation.getCurrentUser()); + } catch (Exception e) { + LOG.error("Error when publishing entity " + entity, e); + } + } + + private class ApplicationFinishPublishEvent extends TimelineV2PublishEvent { + private RMAppImpl app; + + public ApplicationFinishPublishEvent(SystemMetricsEventType type, + TimelineEntity entity, RMAppImpl app) { + super(type, entity, app.getApplicationId()); + this.app = app; + } + + public RMAppImpl getRMAppImpl() { + return app; + } + } + + private class TimelineV2EventHandler + implements EventHandler<TimelineV2PublishEvent> { + @Override + public void handle(TimelineV2PublishEvent event) { + switch (event.getType()) { + case PUBLISH_APPLICATION_FINISHED_ENTITY: + putEntity(event.getEntity(), event.getApplicationId()); + ((ApplicationFinishPublishEvent) event).getRMAppImpl() + .stopTimelineCollector(); + break; + default: + putEntity(event.getEntity(), event.getApplicationId()); + break; + } + } + } + + private class TimelineV2PublishEvent extends TimelinePublishEvent { + private TimelineEntity entity; + + public TimelineV2PublishEvent(SystemMetricsEventType type, + TimelineEntity entity, ApplicationId appId) { + super(type, appId); + this.entity = entity; + } + + public TimelineEntity getEntity() { + return entity; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/package-info.java new file mode 100644 index 0000000..a8a3804 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.resourcemanager.metrics contains + * classes related to publishing app/container events to ATS. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index bb0fc34..98cbd92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -177,6 +177,29 @@ public interface RMApp extends EventHandler<RMAppEvent> { String getTrackingUrl(); /** + * The collector address for the application. It should be used only if the + * timeline service v.2 is enabled. + * + * @return the address for the application's collector, or null if the + * timeline service v.2 is not enabled. + */ + String getCollectorAddr(); + + /** + * Set collector address for the application. It should be used only if the + * timeline service v.2 is enabled. + * + * @param collectorAddr the address of collector + */ + void setCollectorAddr(String collectorAddr); + + /** + * Remove collector address when application is finished or killed. It should + * be used only if the timeline service v.2 is enabled. + */ + void removeCollectorAddr(); + + /** * The original tracking url for the application master. * @return the original tracking url for the application master. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java new file mode 100644 index 0000000..9642911 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * Event used for updating collector address in RMApp on node heartbeat. + */ +public class RMAppCollectorUpdateEvent extends RMAppEvent { + + private final String appCollectorAddr; + + public RMAppCollectorUpdateEvent(ApplicationId appId, + String appCollectorAddr) { + super(appId, RMAppEventType.COLLECTOR_UPDATE); + this.appCollectorAddr = appCollectorAddr; + } + + public String getAppCollectorAddr(){ + return this.appCollectorAddr; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 668c5e1..2b42638 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -31,6 +31,9 @@ public enum RMAppEventType { // Source: Scheduler APP_ACCEPTED, + // TODO add source later + COLLECTOR_UPDATE, + // Source: RMAppAttempt ATTEMPT_REGISTERED, ATTEMPT_UNREGISTERED, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org