YARN-8130 Race condition when container events are published for KILLED applications. (Rohith Sharma K S via Haibo Chen)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2d00a0c7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2d00a0c7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2d00a0c7 Branch: refs/heads/HDDS-4 Commit: 2d00a0c71b5dde31e2cf8fcb96d9d541d41fb879 Parents: 6beb25a Author: Haibo Chen <haiboc...@apache.org> Authored: Mon May 14 11:08:42 2018 -0700 Committer: Haibo Chen <haiboc...@apache.org> Committed: Mon May 14 11:08:42 2018 -0700 ---------------------------------------------------------------------- .../timelineservice/NMTimelineEvent.java | 12 ++- .../timelineservice/NMTimelineEventType.java | 3 + .../timelineservice/NMTimelinePublisher.java | 23 +++-- .../TestNMTimelinePublisher.java | 102 ++++++++++++++++--- 4 files changed, 113 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d00a0c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java index f275b37..1ee27d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.timelineservice; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.AbstractEvent; /** @@ -25,11 +26,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent; * timelineservice v2. */ public class NMTimelineEvent extends AbstractEvent<NMTimelineEventType> { - public NMTimelineEvent(NMTimelineEventType type) { - super(type); + private ApplicationId appId; + + public NMTimelineEvent(NMTimelineEventType type, ApplicationId appId) { + super(type, System.currentTimeMillis()); + this.appId=appId; } - public NMTimelineEvent(NMTimelineEventType type, long timestamp) { - super(type, timestamp); + public ApplicationId getApplicationId() { + return appId; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d00a0c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java index b4ae45a..5d81c94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java @@ -24,4 +24,7 @@ package org.apache.hadoop.yarn.server.nodemanager.timelineservice; public enum NMTimelineEventType { // Publish the NM Timeline entity TIMELINE_ENTITY_PUBLISH, + + // Stop and remove timeline client + STOP_TIMELINE_CLIENT } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d00a0c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 13d5c67..f451726 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -96,7 +96,7 @@ public class NMTimelinePublisher extends CompositeService { @Override protected void serviceInit(Configuration conf) throws Exception { - dispatcher = new AsyncDispatcher("NM Timeline dispatcher"); + dispatcher = createDispatcher(); dispatcher.register(NMTimelineEventType.class, new ForwardingEventHandler()); addIfService(dispatcher); @@ -113,6 +113,10 @@ public class NMTimelinePublisher extends CompositeService { super.serviceInit(conf); } + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher("NM Timeline dispatcher"); + } + @Override protected void serviceStart() throws Exception { super.serviceStart(); @@ -141,6 +145,9 @@ public class NMTimelinePublisher extends CompositeService { putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(), ((TimelinePublishEvent) event).getApplicationId()); break; + case STOP_TIMELINE_CLIENT: + removeAndStopTimelineClient(event.getApplicationId()); + break; default: LOG.error("Unknown NMTimelineEvent type: " + event.getType()); } @@ -392,20 +399,13 @@ public class NMTimelinePublisher extends CompositeService { } private static class TimelinePublishEvent extends NMTimelineEvent { - private ApplicationId appId; private TimelineEntity entityToPublish; public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) { - super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System - .currentTimeMillis()); - this.appId = appId; + super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, appId); this.entityToPublish = entity; } - public ApplicationId getApplicationId() { - return appId; - } - public TimelineEntity getTimelineEntityToPublish() { return entityToPublish; } @@ -434,6 +434,11 @@ public class NMTimelinePublisher extends CompositeService { } public void stopTimelineClient(ApplicationId appId) { + dispatcher.getEventHandler().handle( + new NMTimelineEvent(NMTimelineEventType.STOP_TIMELINE_CLIENT, appId)); + } + + private void removeAndStopTimelineClient(ApplicationId appId) { TimelineV2Client client = appToClientMap.remove(appId); if (client != null) { client.stop(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d00a0c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java index 43196c7..2585262 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -31,34 +31,47 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.junit.Assert; import org.junit.Test; +import org.junit.After; +import org.junit.Before; public class TestNMTimelinePublisher { private static final String MEMORY_ID = "MEMORY"; private static final String CPU_ID = "CPU"; - @Test - public void testContainerResourceUsage() { - Context context = mock(Context.class); - @SuppressWarnings("unchecked") - final DummyTimelineClient timelineClient = new DummyTimelineClient(null); - when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); + private NMTimelinePublisher publisher; + private DummyTimelineClient timelineClient; + private Configuration conf; + private DrainDispatcher dispatcher; - Configuration conf = new Configuration(); + + @Before public void setup() throws Exception { + conf = new Configuration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS, + 3000L); + timelineClient = new DummyTimelineClient(null); + Context context = createMockContext(); + dispatcher = new DrainDispatcher(); - NMTimelinePublisher publisher = new NMTimelinePublisher(context) { + publisher = new NMTimelinePublisher(context) { public void createTimelineClient(ApplicationId appId) { if (!getAppToClientMap().containsKey(appId)) { timelineClient.init(getConfig()); @@ -66,15 +79,73 @@ public class TestNMTimelinePublisher { getAppToClientMap().put(appId, timelineClient); } } + + @Override protected AsyncDispatcher createDispatcher() { + return dispatcher; + } }; publisher.init(conf); publisher.start(); + } + + private Context createMockContext() { + Context context = mock(Context.class); + when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); + return context; + } + + @After public void tearDown() throws Exception { + if (publisher != null) { + publisher.stop(); + } + if (timelineClient != null) { + timelineClient.stop(); + } + } + + @Test public void testPublishContainerFinish() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 2); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); + + String diag = "test-diagnostics"; + int exitStatus = 0; + ContainerStatus cStatus = mock(ContainerStatus.class); + when(cStatus.getContainerId()).thenReturn(cId); + when(cStatus.getDiagnostics()).thenReturn(diag); + when(cStatus.getExitStatus()).thenReturn(exitStatus); + long timeStamp = System.currentTimeMillis(); + + ApplicationContainerFinishedEvent finishedEvent = + new ApplicationContainerFinishedEvent(cStatus, timeStamp); + + publisher.createTimelineClient(appId); + publisher.publishApplicationEvent(finishedEvent); + publisher.stopTimelineClient(appId); + dispatcher.await(); + + ContainerEntity cEntity = new ContainerEntity(); + cEntity.setId(cId.toString()); + TimelineEntity[] lastPublishedEntities = + timelineClient.getLastPublishedEntities(); + + Assert.assertNotNull(lastPublishedEntities); + Assert.assertEquals(1, lastPublishedEntities.length); + TimelineEntity entity = lastPublishedEntities[0]; + Assert.assertTrue(cEntity.equals(entity)); + Assert.assertEquals(diag, + entity.getInfo().get(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + Assert.assertEquals(exitStatus, + entity.getInfo().get(ContainerMetricsConstants.EXIT_STATUS_INFO)); + } + + @Test public void testContainerResourceUsage() { ApplicationId appId = ApplicationId.newInstance(0, 1); publisher.createTimelineClient(appId); Container aContainer = mock(Container.class); - when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId( - ApplicationAttemptId.newInstance(appId, 1), - 0L)); + when(aContainer.getContainerId()).thenReturn(ContainerId + .newContainerId(ApplicationAttemptId.newInstance(appId, 1), 0L)); publisher.reportContainerResourceUsage(aContainer, 1024L, 8F); verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8); timelineClient.reset(); @@ -91,7 +162,6 @@ public class TestNMTimelinePublisher { (float) ResourceCalculatorProcessTree.UNAVAILABLE); verifyPublishedResourceUsageMetrics(timelineClient, 1024L, ResourceCalculatorProcessTree.UNAVAILABLE); - publisher.stop(); } private void verifyPublishedResourceUsageMetrics( @@ -151,8 +221,12 @@ public class TestNMTimelinePublisher { private TimelineEntity[] lastPublishedEntities; - @Override - public void putEntitiesAsync(TimelineEntity... entities) + @Override public void putEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException { + this.lastPublishedEntities = entities; + } + + @Override public void putEntities(TimelineEntity... entities) throws IOException, YarnException { this.lastPublishedEntities = entities; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org