This is an automated email from the ASF dual-hosted git repository. vrushali pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 27039a2 YARN-9382 Publish container killed, paused and resumed events to ATSv2. Contributed by Abhishesk Modi. 27039a2 is described below commit 27039a29ae403398182e615fa5c1d0cb91a54268 Author: Vrushali C <vrush...@apache.org> AuthorDate: Fri Apr 5 12:02:43 2019 -0700 YARN-9382 Publish container killed, paused and resumed events to ATSv2. Contributed by Abhishesk Modi. --- .../server/metrics/ContainerMetricsConstants.java | 9 ++ .../timelineservice/NMTimelinePublisher.java | 102 ++++++++++++++- .../timelineservice/TestNMTimelinePublisher.java | 137 +++++++++++++++++++++ 3 files changed, 247 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java index 7d6fc92..8b2fb85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java @@ -35,6 +35,15 @@ public class ContainerMetricsConstants { "YARN_RM_CONTAINER_CREATED"; // Event of this type will be emitted by NM. + public static final String PAUSED_EVENT_TYPE = "YARN_CONTAINER_PAUSED"; + + // Event of this type will be emitted by NM. + public static final String RESUMED_EVENT_TYPE = "YARN_CONTAINER_RESUMED"; + + // Event of this type will be emitted by NM. + public static final String KILLED_EVENT_TYPE = "YARN_CONTAINER_KILLED"; + + // Event of this type will be emitted by NM. public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED"; // Event of this type will be emitted by RM. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index b2d9376..ba57495 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -25,6 +25,9 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -253,6 +256,95 @@ public class NMTimelinePublisher extends CompositeService { } @SuppressWarnings("unchecked") + private void publishContainerResumedEvent( + ContainerEvent event) { + if (publishNMContainerEvents) { + ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event; + ContainerId containerId = resumeEvent.getContainerID(); + ContainerEntity entity = createContainerEntity(containerId); + + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO, + resumeEvent.getDiagnostic()); + entity.setInfo(entityInfo); + + Container container = context.getContainers().get(containerId); + if (container != null) { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.RESUMED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + long containerStartTime = container.getContainerStartTime(); + entity.addEvent(tEvent); + entity + .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); + dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, + containerId.getApplicationAttemptId().getApplicationId())); + } + } + } + + @SuppressWarnings("unchecked") + private void publishContainerPausedEvent( + ContainerEvent event) { + if (publishNMContainerEvents) { + ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event; + ContainerId containerId = pauseEvent.getContainerID(); + ContainerEntity entity = createContainerEntity(containerId); + + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO, + pauseEvent.getDiagnostic()); + entity.setInfo(entityInfo); + + Container container = context.getContainers().get(containerId); + if (container != null) { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.PAUSED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + long containerStartTime = container.getContainerStartTime(); + entity.addEvent(tEvent); + entity + .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); + dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, + containerId.getApplicationAttemptId().getApplicationId())); + } + } + } + + @SuppressWarnings("unchecked") + private void publishContainerKilledEvent( + ContainerEvent event) { + if (publishNMContainerEvents) { + ContainerKillEvent killEvent = (ContainerKillEvent) event; + ContainerId containerId = killEvent.getContainerID(); + ContainerEntity entity = createContainerEntity(containerId); + + Map<String, Object> entityInfo = new HashMap<String, Object>(); + entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO, + killEvent.getDiagnostic()); + entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO, + killEvent.getContainerExitStatus()); + entity.setInfo(entityInfo); + + Container container = context.getContainers().get(containerId); + if (container != null) { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.KILLED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + long containerStartTime = container.getContainerStartTime(); + entity.addEvent(tEvent); + entity + .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); + dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, + containerId.getApplicationAttemptId().getApplicationId())); + } + } + } + + @SuppressWarnings("unchecked") private void publishContainerFinishedEvent(ContainerStatus containerStatus, long containerFinishTime, long containerStartTime) { if (publishNMContainerEvents) { @@ -384,7 +476,15 @@ public class NMTimelinePublisher extends CompositeService { case INIT_CONTAINER: publishContainerCreatedEvent(event); break; - + case KILL_CONTAINER: + publishContainerKilledEvent(event); + break; + case PAUSE_CONTAINER: + publishContainerPausedEvent(event); + break; + case RESUME_CONTAINER: + publishContainerResumedEvent(event); + break; default: LOG.debug("{} is not a desired ContainerEvent which needs to be " + " published by NMTimelinePublisher", event.getType()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java index ae51f85..abd27ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -25,7 +25,11 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Iterator; +import java.util.Map; import java.util.Map.Entry; +import java.util.NavigableSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -35,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -45,6 +50,10 @@ import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.junit.Assert; @@ -94,6 +103,19 @@ public class TestNMTimelinePublisher { private Context createMockContext() { Context context = mock(Context.class); when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); + + ConcurrentMap<ContainerId, Container> containers = + new ConcurrentHashMap<>(); + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); + Container container = mock(Container.class); + when(container.getContainerStartTime()) + .thenReturn(System.currentTimeMillis()); + containers.putIfAbsent(cId, container); + when(context.getContainers()).thenReturn(containers); + return context; } @@ -145,6 +167,121 @@ public class TestNMTimelinePublisher { cId.getContainerId()), entity.getIdPrefix()); } + @Test + public void testPublishContainerPausedEvent() { + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); + + ContainerEvent containerEvent = + new ContainerPauseEvent(cId, "test pause"); + + publisher.createTimelineClient(appId); + publisher.publishContainerEvent(containerEvent); + publisher.stopTimelineClient(appId); + dispatcher.await(); + + ContainerEntity cEntity = new ContainerEntity(); + cEntity.setId(cId.toString()); + TimelineEntity[] lastPublishedEntities = + timelineClient.getLastPublishedEntities(); + + Assert.assertNotNull(lastPublishedEntities); + Assert.assertEquals(1, lastPublishedEntities.length); + TimelineEntity entity = lastPublishedEntities[0]; + Assert.assertEquals(cEntity, entity); + + NavigableSet<TimelineEvent> events = entity.getEvents(); + Assert.assertEquals(1, events.size()); + Assert.assertEquals(ContainerMetricsConstants.PAUSED_EVENT_TYPE, + events.iterator().next().getId()); + + Map<String, Object> info = entity.getInfo(); + Assert.assertTrue( + info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + Assert.assertEquals("test pause", + info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + } + + @Test + public void testPublishContainerResumedEvent() { + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); + + ContainerEvent containerEvent = + new ContainerResumeEvent(cId, "test resume"); + + publisher.createTimelineClient(appId); + publisher.publishContainerEvent(containerEvent); + publisher.stopTimelineClient(appId); + dispatcher.await(); + + ContainerEntity cEntity = new ContainerEntity(); + cEntity.setId(cId.toString()); + TimelineEntity[] lastPublishedEntities = + timelineClient.getLastPublishedEntities(); + + Assert.assertNotNull(lastPublishedEntities); + Assert.assertEquals(1, lastPublishedEntities.length); + TimelineEntity entity = lastPublishedEntities[0]; + Assert.assertEquals(cEntity, entity); + + NavigableSet<TimelineEvent> events = entity.getEvents(); + Assert.assertEquals(1, events.size()); + Assert.assertEquals(ContainerMetricsConstants.RESUMED_EVENT_TYPE, + events.iterator().next().getId()); + + Map<String, Object> info = entity.getInfo(); + Assert.assertTrue( + info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + Assert.assertEquals("test resume", + info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + } + + @Test + public void testPublishContainerKilledEvent() { + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); + + ContainerEvent containerEvent = + new ContainerKillEvent(cId, 1, "test kill"); + + publisher.createTimelineClient(appId); + publisher.publishContainerEvent(containerEvent); + publisher.stopTimelineClient(appId); + dispatcher.await(); + + ContainerEntity cEntity = new ContainerEntity(); + cEntity.setId(cId.toString()); + TimelineEntity[] lastPublishedEntities = + timelineClient.getLastPublishedEntities(); + + Assert.assertNotNull(lastPublishedEntities); + Assert.assertEquals(1, lastPublishedEntities.length); + TimelineEntity entity = lastPublishedEntities[0]; + Assert.assertEquals(cEntity, entity); + + NavigableSet<TimelineEvent> events = entity.getEvents(); + Assert.assertEquals(1, events.size()); + Assert.assertEquals(ContainerMetricsConstants.KILLED_EVENT_TYPE, + events.iterator().next().getId()); + + Map<String, Object> info = entity.getInfo(); + Assert.assertTrue( + info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + Assert.assertEquals("test kill", + info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + Assert.assertTrue( + info.containsKey(ContainerMetricsConstants.EXIT_STATUS_INFO)); + Assert.assertEquals(1, + info.get(ContainerMetricsConstants.EXIT_STATUS_INFO)); + } + @Test public void testContainerResourceUsage() { ApplicationId appId = ApplicationId.newInstance(0, 1); publisher.createTimelineClient(appId); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org