YARN-8130 Race condition when container events are published for KILLED 
applications. (Rohith Sharma K S via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2d00a0c7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2d00a0c7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2d00a0c7

Branch: refs/heads/HDDS-4
Commit: 2d00a0c71b5dde31e2cf8fcb96d9d541d41fb879
Parents: 6beb25a
Author: Haibo Chen <haiboc...@apache.org>
Authored: Mon May 14 11:08:42 2018 -0700
Committer: Haibo Chen <haiboc...@apache.org>
Committed: Mon May 14 11:08:42 2018 -0700

----------------------------------------------------------------------
 .../timelineservice/NMTimelineEvent.java        |  12 ++-
 .../timelineservice/NMTimelineEventType.java    |   3 +
 .../timelineservice/NMTimelinePublisher.java    |  23 +++--
 .../TestNMTimelinePublisher.java                | 102 ++++++++++++++++---
 4 files changed, 113 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d00a0c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
index f275b37..1ee27d2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 
 /**
@@ -25,11 +26,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
  * timelineservice v2.
  */
 public class NMTimelineEvent extends AbstractEvent<NMTimelineEventType> {
-  public NMTimelineEvent(NMTimelineEventType type) {
-    super(type);
+  private ApplicationId appId;
+
+  public NMTimelineEvent(NMTimelineEventType type, ApplicationId appId) {
+    super(type, System.currentTimeMillis());
+    this.appId=appId;
   }
 
-  public NMTimelineEvent(NMTimelineEventType type, long timestamp) {
-    super(type, timestamp);
+  public ApplicationId getApplicationId() {
+    return appId;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d00a0c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
index b4ae45a..5d81c94 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
@@ -24,4 +24,7 @@ package 
org.apache.hadoop.yarn.server.nodemanager.timelineservice;
 public enum NMTimelineEventType {
   // Publish the NM Timeline entity
   TIMELINE_ENTITY_PUBLISH,
+
+  // Stop and remove timeline client
+  STOP_TIMELINE_CLIENT
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d00a0c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 13d5c67..f451726 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -96,7 +96,7 @@ public class NMTimelinePublisher extends CompositeService {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    dispatcher = new AsyncDispatcher("NM Timeline dispatcher");
+    dispatcher = createDispatcher();
     dispatcher.register(NMTimelineEventType.class,
         new ForwardingEventHandler());
     addIfService(dispatcher);
@@ -113,6 +113,10 @@ public class NMTimelinePublisher extends CompositeService {
     super.serviceInit(conf);
   }
 
+  protected AsyncDispatcher createDispatcher() {
+    return new AsyncDispatcher("NM Timeline dispatcher");
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     super.serviceStart();
@@ -141,6 +145,9 @@ public class NMTimelinePublisher extends CompositeService {
       putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(),
           ((TimelinePublishEvent) event).getApplicationId());
       break;
+    case STOP_TIMELINE_CLIENT:
+      removeAndStopTimelineClient(event.getApplicationId());
+      break;
     default:
       LOG.error("Unknown NMTimelineEvent type: " + event.getType());
     }
@@ -392,20 +399,13 @@ public class NMTimelinePublisher extends CompositeService 
{
   }
 
   private static class TimelinePublishEvent extends NMTimelineEvent {
-    private ApplicationId appId;
     private TimelineEntity entityToPublish;
 
     public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) {
-      super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System
-          .currentTimeMillis());
-      this.appId = appId;
+      super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, appId);
       this.entityToPublish = entity;
     }
 
-    public ApplicationId getApplicationId() {
-      return appId;
-    }
-
     public TimelineEntity getTimelineEntityToPublish() {
       return entityToPublish;
     }
@@ -434,6 +434,11 @@ public class NMTimelinePublisher extends CompositeService {
   }
 
   public void stopTimelineClient(ApplicationId appId) {
+    dispatcher.getEventHandler().handle(
+        new NMTimelineEvent(NMTimelineEventType.STOP_TIMELINE_CLIENT, appId));
+  }
+
+  private void removeAndStopTimelineClient(ApplicationId appId) {
     TimelineV2Client client = appToClientMap.remove(appId);
     if (client != null) {
       client.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d00a0c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
index 43196c7..2585262 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
@@ -31,34 +31,47 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.After;
+import org.junit.Before;
 
 public class TestNMTimelinePublisher {
   private static final String MEMORY_ID = "MEMORY";
   private static final String CPU_ID = "CPU";
 
-  @Test
-  public void testContainerResourceUsage() {
-    Context context = mock(Context.class);
-    @SuppressWarnings("unchecked")
-    final DummyTimelineClient timelineClient = new DummyTimelineClient(null);
-    when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
+  private NMTimelinePublisher publisher;
+  private DummyTimelineClient timelineClient;
+  private Configuration conf;
+  private DrainDispatcher dispatcher;
 
-    Configuration conf = new Configuration();
+
+  @Before public void setup() throws Exception {
+    conf = new Configuration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
     conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
+        3000L);
+    timelineClient = new DummyTimelineClient(null);
+    Context context = createMockContext();
+    dispatcher = new DrainDispatcher();
 
-    NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
+    publisher = new NMTimelinePublisher(context) {
       public void createTimelineClient(ApplicationId appId) {
         if (!getAppToClientMap().containsKey(appId)) {
           timelineClient.init(getConfig());
@@ -66,15 +79,73 @@ public class TestNMTimelinePublisher {
           getAppToClientMap().put(appId, timelineClient);
         }
       }
+
+      @Override protected AsyncDispatcher createDispatcher() {
+        return dispatcher;
+      }
     };
     publisher.init(conf);
     publisher.start();
+  }
+
+  private Context createMockContext() {
+    Context context = mock(Context.class);
+    when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
+    return context;
+  }
+
+  @After public void tearDown() throws Exception {
+    if (publisher != null) {
+      publisher.stop();
+    }
+    if (timelineClient != null) {
+      timelineClient.stop();
+    }
+  }
+
+  @Test public void testPublishContainerFinish() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(0, 2);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
+
+    String diag = "test-diagnostics";
+    int exitStatus = 0;
+    ContainerStatus cStatus = mock(ContainerStatus.class);
+    when(cStatus.getContainerId()).thenReturn(cId);
+    when(cStatus.getDiagnostics()).thenReturn(diag);
+    when(cStatus.getExitStatus()).thenReturn(exitStatus);
+    long timeStamp = System.currentTimeMillis();
+
+    ApplicationContainerFinishedEvent finishedEvent =
+        new ApplicationContainerFinishedEvent(cStatus, timeStamp);
+
+    publisher.createTimelineClient(appId);
+    publisher.publishApplicationEvent(finishedEvent);
+    publisher.stopTimelineClient(appId);
+    dispatcher.await();
+
+    ContainerEntity cEntity = new ContainerEntity();
+    cEntity.setId(cId.toString());
+    TimelineEntity[] lastPublishedEntities =
+        timelineClient.getLastPublishedEntities();
+
+    Assert.assertNotNull(lastPublishedEntities);
+    Assert.assertEquals(1, lastPublishedEntities.length);
+    TimelineEntity entity = lastPublishedEntities[0];
+    Assert.assertTrue(cEntity.equals(entity));
+    Assert.assertEquals(diag,
+        entity.getInfo().get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+    Assert.assertEquals(exitStatus,
+        entity.getInfo().get(ContainerMetricsConstants.EXIT_STATUS_INFO));
+  }
+
+  @Test public void testContainerResourceUsage() {
     ApplicationId appId = ApplicationId.newInstance(0, 1);
     publisher.createTimelineClient(appId);
     Container aContainer = mock(Container.class);
-    when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId(
-        ApplicationAttemptId.newInstance(appId, 1),
-        0L));
+    when(aContainer.getContainerId()).thenReturn(ContainerId
+        .newContainerId(ApplicationAttemptId.newInstance(appId, 1), 0L));
     publisher.reportContainerResourceUsage(aContainer, 1024L, 8F);
     verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8);
     timelineClient.reset();
@@ -91,7 +162,6 @@ public class TestNMTimelinePublisher {
         (float) ResourceCalculatorProcessTree.UNAVAILABLE);
     verifyPublishedResourceUsageMetrics(timelineClient, 1024L,
         ResourceCalculatorProcessTree.UNAVAILABLE);
-    publisher.stop();
   }
 
   private void verifyPublishedResourceUsageMetrics(
@@ -151,8 +221,12 @@ public class TestNMTimelinePublisher {
 
     private TimelineEntity[] lastPublishedEntities;
 
-    @Override
-    public void putEntitiesAsync(TimelineEntity... entities)
+    @Override public void putEntitiesAsync(TimelineEntity... entities)
+        throws IOException, YarnException {
+      this.lastPublishedEntities = entities;
+    }
+
+    @Override public void putEntities(TimelineEntity... entities)
         throws IOException, YarnException {
       this.lastPublishedEntities = entities;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to