Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 b2bc72a05 -> 9f6ed41b9


YARN-3619. ContainerMetrics unregisters during getMetrics and leads to 
ConcurrentModificationException. Contributed by Zhihai Xu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9f6ed41b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9f6ed41b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9f6ed41b

Branch: refs/heads/branch-2.7
Commit: 9f6ed41b952790880e5c6da5c767e06069a37b47
Parents: b2bc72a
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Oct 2 20:19:14 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Oct 2 20:19:14 2015 +0000

----------------------------------------------------------------------
 .../hadoop/metrics2/impl/MetricsSystemImpl.java |  3 +-
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../hadoop/yarn/conf/YarnConfiguration.java     | 10 ++++-
 .../src/main/resources/yarn-default.xml         |  8 ++++
 .../monitor/ContainerMetrics.java               | 38 +++++++++++++----
 .../monitor/ContainersMonitorImpl.java          | 16 +++++--
 .../monitor/TestContainerMetrics.java           | 45 +++++++++++++++++++-
 7 files changed, 107 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6ed41b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
index b7f264b..cbdfdbd 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
@@ -398,7 +398,8 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
    * Sample all the sources for a snapshot of metrics/tags
    * @return  the metrics buffer containing the snapshot
    */
-  synchronized MetricsBuffer sampleMetrics() {
+  @VisibleForTesting
+  public synchronized MetricsBuffer sampleMetrics() {
     collector.clear();
     MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6ed41b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e7452c4..901109f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -91,6 +91,9 @@ Release 2.7.2 - UNRELEASED
     YARN-3727. For better error recovery, check if the directory exists before
     using it for localization. (Zhihai Xu via jlowe)
 
+    YARN-3619. ContainerMetrics unregisters during getMetrics and leads to
+    ConcurrentModificationException (Zhihai Xu via jlowe)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6ed41b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6748df7..c029d90 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -882,7 +882,15 @@ public class YarnConfiguration extends Configuration {
       NM_PREFIX + "container-metrics.period-ms";
   @Private
   public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1;
-  
+
+  /** The delay time ms to unregister container metrics after completion. */
+  @Private
+  public static final String NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS =
+      NM_PREFIX + "container-metrics.unregister-delay-ms";
+  @Private
+  public static final int DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS =
+      10000;
+
   /** Prefix for all node manager disk health checker configs. */
   private static final String NM_DISK_HEALTH_CHECK_PREFIX =
       "yarn.nodemanager.disk-health-checker.";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6ed41b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index de6906e..7b2be61 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1224,6 +1224,14 @@
     <value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
   </property>
 
+  <property>
+    <description>
+    The delay time ms to unregister container metrics after completion.
+    </description>
+    <name>yarn.nodemanager.container-metrics.unregister-delay-ms</name>
+    <value>10000</value>
+  </property>
+
   <!--Docker configuration-->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6ed41b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
index ffa72a4..365fe84 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
@@ -90,6 +90,7 @@ public class ContainerMetrics implements MetricsSource {
   private boolean flushOnPeriod = false; // true if period elapsed
   private boolean finished = false; // true if container finished
   private boolean unregister = false; // unregister
+  private long unregisterDelayMs;
   private Timer timer; // lazily initialized
 
   /**
@@ -97,15 +98,21 @@ public class ContainerMetrics implements MetricsSource {
    */
   protected final static Map<ContainerId, ContainerMetrics>
       usageMetrics = new HashMap<>();
+  // Create a timer to unregister container metrics,
+  // whose associated thread run as a daemon.
+  private final static Timer unregisterContainerMetricsTimer =
+      new Timer("Container metrics unregistration", true);
 
   ContainerMetrics(
-      MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
+      MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
+      long delayMs) {
     this.recordInfo =
         info(sourceName(containerId), RECORD_INFO.description());
     this.registry = new MetricsRegistry(recordInfo);
     this.metricsSystem = ms;
     this.containerId = containerId;
     this.flushPeriodMs = flushPeriodMs;
+    this.unregisterDelayMs = delayMs < 0 ? 0 : delayMs;
     scheduleTimerTaskIfRequired();
 
     this.pMemMBsStat = registry.newStat(
@@ -134,17 +141,18 @@ public class ContainerMetrics implements MetricsSource {
   }
 
   public static ContainerMetrics forContainer(
-      ContainerId containerId, long flushPeriodMs) {
+      ContainerId containerId, long flushPeriodMs, long delayMs) {
     return forContainer(
-        DefaultMetricsSystem.instance(), containerId, flushPeriodMs);
+        DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs);
   }
 
   synchronized static ContainerMetrics forContainer(
-      MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
+      MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
+      long delayMs) {
     ContainerMetrics metrics = usageMetrics.get(containerId);
     if (metrics == null) {
-      metrics = new ContainerMetrics(
-          ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId);
+      metrics = new ContainerMetrics(ms, containerId, flushPeriodMs,
+          delayMs).tag(RECORD_INFO, containerId);
 
       // Register with the MetricsSystems
       if (ms != null) {
@@ -158,12 +166,15 @@ public class ContainerMetrics implements MetricsSource {
     return metrics;
   }
 
+  synchronized static void unregisterContainerMetrics(ContainerMetrics cm) {
+    cm.metricsSystem.unregisterSource(cm.recordInfo.name());
+    usageMetrics.remove(cm.containerId);
+  }
+
   @Override
   public synchronized void getMetrics(MetricsCollector collector, boolean all) 
{
     //Container goes through registered -> finished -> unregistered.
     if (unregister) {
-      metricsSystem.unregisterSource(recordInfo.name());
-      usageMetrics.remove(containerId);
       return;
     }
 
@@ -185,6 +196,7 @@ public class ContainerMetrics implements MetricsSource {
       timer.cancel();
       timer = null;
     }
+    scheduleTimerTaskForUnregistration();
   }
 
   public void recordMemoryUsage(int memoryMBs) {
@@ -232,4 +244,14 @@ public class ContainerMetrics implements MetricsSource {
       timer.schedule(timerTask, flushPeriodMs);
     }
   }
+
+  private void scheduleTimerTaskForUnregistration() {
+    TimerTask timerTask = new TimerTask() {
+      @Override
+      public void run() {
+        ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this);
+      }
+    };
+    unregisterContainerMetricsTimer.schedule(timerTask, unregisterDelayMs);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6ed41b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 5153051..20d2112 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -54,6 +54,7 @@ public class ContainersMonitorImpl extends AbstractService 
implements
   private MonitoringThread monitoringThread;
   private boolean containerMetricsEnabled;
   private long containerMetricsPeriodMs;
+  private long containerMetricsUnregisterDelayMs;
 
   final List<ContainerId> containersToBeRemoved;
   final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
@@ -116,6 +117,9 @@ public class ContainersMonitorImpl extends AbstractService 
implements
     this.containerMetricsPeriodMs =
         conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
             YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
+    this.containerMetricsUnregisterDelayMs = conf.getLong(
+        YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,
+        YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
 
     long configuredPMemForContainers = conf.getLong(
         YarnConfiguration.NM_PMEM_MB,
@@ -379,7 +383,8 @@ public class ContainersMonitorImpl extends AbstractService 
implements
           for (ContainerId containerId : containersToBeRemoved) {
             if (containerMetricsEnabled) {
               ContainerMetrics.forContainer(
-                  containerId, containerMetricsPeriodMs).finished();
+                  containerId, containerMetricsPeriodMs,
+                  containerMetricsUnregisterDelayMs).finished();
             }
             trackingContainers.remove(containerId);
             LOG.info("Stopping resource-monitoring for " + containerId);
@@ -417,7 +422,8 @@ public class ContainersMonitorImpl extends AbstractService 
implements
 
                 if (containerMetricsEnabled) {
                   ContainerMetrics usageMetrics = ContainerMetrics
-                      .forContainer(containerId, containerMetricsPeriodMs);
+                      .forContainer(containerId, containerMetricsPeriodMs,
+                      containerMetricsUnregisterDelayMs);
                   int cpuVcores = ptInfo.getCpuVcores();
                   final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20);
                   final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20);
@@ -464,10 +470,12 @@ public class ContainersMonitorImpl extends 
AbstractService implements
             // Add usage to container metrics
             if (containerMetricsEnabled) {
               ContainerMetrics.forContainer(
-                  containerId, containerMetricsPeriodMs).recordMemoryUsage(
+                  containerId, containerMetricsPeriodMs,
+                  containerMetricsUnregisterDelayMs).recordMemoryUsage(
                   (int) (currentPmemUsage >> 20));
               ContainerMetrics.forContainer(
-                  containerId, containerMetricsPeriodMs).recordCpuUsage
+                  containerId, containerMetricsPeriodMs,
+                  containerMetricsUnregisterDelayMs).recordCpuUsage
                   ((int)cpuUsagePercentPerCore, milliVcoresUsed);
             }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6ed41b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
index c628648..ec06856 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
@@ -22,11 +22,15 @@ import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
 import org.apache.hadoop.metrics2.impl.MetricsRecords;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -44,7 +48,8 @@ public class TestContainerMetrics {
 
     MetricsCollectorImpl collector = new MetricsCollectorImpl();
     ContainerId containerId = mock(ContainerId.class);
-    ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
+    ContainerMetrics metrics = ContainerMetrics.forContainer(containerId,
+        100, 1);
 
     metrics.recordMemoryUsage(1024);
     metrics.getMetrics(collector, true);
@@ -82,7 +87,8 @@ public class TestContainerMetrics {
 
     MetricsCollectorImpl collector = new MetricsCollectorImpl();
     ContainerId containerId = mock(ContainerId.class);
-    ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
+    ContainerMetrics metrics = ContainerMetrics.forContainer(containerId,
+        100, 1);
 
     int anyPmemLimit = 1024;
     int anyVmemLimit = 2048;
@@ -107,4 +113,39 @@ public class TestContainerMetrics {
 
     collector.clear();
   }
+
+  @Test
+  public void testContainerMetricsFinished() throws InterruptedException {
+    MetricsSystemImpl system = new MetricsSystemImpl();
+    system.init("test");
+    MetricsCollectorImpl collector = new MetricsCollectorImpl();
+    ApplicationId appId = ApplicationId.newInstance(1234, 3);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 4);
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
+    ContainerMetrics metrics1 = ContainerMetrics.forContainer(system,
+        containerId1, 1, 0);
+    ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2);
+    ContainerMetrics metrics2 = ContainerMetrics.forContainer(system,
+        containerId2, 1, 0);
+    ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3);
+    ContainerMetrics metrics3 = ContainerMetrics.forContainer(system,
+        containerId3, 1, 0);
+    metrics1.finished();
+    metrics2.finished();
+    system.sampleMetrics();
+    system.sampleMetrics();
+    Thread.sleep(100);
+    system.stop();
+    // verify metrics1 is unregistered
+    assertTrue(metrics1 != ContainerMetrics.forContainer(
+        system, containerId1, 1, 0));
+    // verify metrics2 is unregistered
+    assertTrue(metrics2 != ContainerMetrics.forContainer(
+        system, containerId2, 1, 0));
+    // verify metrics3 is still registered
+    assertTrue(metrics3 == ContainerMetrics.forContainer(
+        system, containerId3, 1, 0));
+    system.shutdown();
+  }
 }

Reply via email to