MAPREDUCE-6829. Add peak memory usage counter for each task. (Miklos Szegedi 
via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c65f884f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c65f884f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c65f884f

Branch: refs/heads/YARN-2915
Commit: c65f884fc7e08118524f8c88737119d8196b4c1b
Parents: 44606aa
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Thu Jan 26 11:08:13 2017 -0800
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Thu Jan 26 11:08:13 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/mapred/Task.java     |  24 ++-
 .../apache/hadoop/mapreduce/TaskCounter.java    |   8 +-
 .../counters/FrameworkCounterGroup.java         |   6 +-
 .../hadoop/mapreduce/TaskCounter.properties     |   4 +
 .../org/apache/hadoop/mapred/TestCounters.java  |  31 ++-
 .../apache/hadoop/mapred/TestJobCounters.java   | 188 +++++++++++++++++++
 6 files changed, 256 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
index 119d6a7..c1ae0ab 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
@@ -110,7 +110,11 @@ abstract public class Task implements Writable, 
Configurable {
     CPU_MILLISECONDS,
     PHYSICAL_MEMORY_BYTES,
     VIRTUAL_MEMORY_BYTES,
-    COMMITTED_HEAP_BYTES
+    COMMITTED_HEAP_BYTES,
+    MAP_PHYSICAL_MEMORY_BYTES_MAX,
+    MAP_VIRTUAL_MEMORY_BYTES_MAX,
+    REDUCE_PHYSICAL_MEMORY_BYTES_MAX,
+    REDUCE_VIRTUAL_MEMORY_BYTES_MAX
   }
 
   /**
@@ -964,6 +968,24 @@ abstract public class Task implements Writable, 
Configurable {
     if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
       counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
     }
+
+    if (pMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
+      TaskCounter counter = isMapTask() ?
+          TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX :
+          TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX;
+      Counters.Counter pMemCounter =
+          counters.findCounter(counter);
+      pMemCounter.setValue(Math.max(pMemCounter.getValue(), pMem));
+    }
+
+    if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
+      TaskCounter counter = isMapTask() ?
+          TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX :
+          TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX;
+      Counters.Counter vMemCounter =
+          counters.findCounter(counter);
+      vMemCounter.setValue(Math.max(vMemCounter.getValue(), vMem));
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java
index 42ef067..0fab96c 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public enum TaskCounter {
-  MAP_INPUT_RECORDS, 
+  MAP_INPUT_RECORDS,
   MAP_OUTPUT_RECORDS,
   MAP_SKIPPED_RECORDS,
   MAP_OUTPUT_BYTES,
@@ -47,5 +47,9 @@ public enum TaskCounter {
   CPU_MILLISECONDS,
   PHYSICAL_MEMORY_BYTES,
   VIRTUAL_MEMORY_BYTES,
-  COMMITTED_HEAP_BYTES
+  COMMITTED_HEAP_BYTES,
+  MAP_PHYSICAL_MEMORY_BYTES_MAX,
+  MAP_VIRTUAL_MEMORY_BYTES_MAX,
+  REDUCE_PHYSICAL_MEMORY_BYTES_MAX,
+  REDUCE_VIRTUAL_MEMORY_BYTES_MAX;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
index e78fe2e..b51f528 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
@@ -100,7 +100,11 @@ public abstract class FrameworkCounterGroup<T extends 
Enum<T>,
 
     @Override
     public void increment(long incr) {
-      value += incr;
+      if (key.name().endsWith("_MAX")) {
+        value = value > incr ? value : incr;
+      } else {
+        value += incr;
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties
index d54b980..0fd1028 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties
@@ -37,3 +37,7 @@ COMMITTED_HEAP_BYTES.name=     Total committed heap usage 
(bytes)
 CPU_MILLISECONDS.name=         CPU time spent (ms)
 PHYSICAL_MEMORY_BYTES.name=    Physical memory (bytes) snapshot
 VIRTUAL_MEMORY_BYTES.name=     Virtual memory (bytes) snapshot
+MAP_PHYSICAL_MEMORY_BYTES_MAX.name=   Peak Map Physical memory (bytes)
+MAP_VIRTUAL_MEMORY_BYTES_MAX.name=    Peak Map Virtual memory (bytes)
+REDUCE_PHYSICAL_MEMORY_BYTES_MAX.name=Peak Reduce Physical memory (bytes)
+REDUCE_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Reduce Virtual memory (bytes)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java
index 5e2763e..010f0f3 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java
@@ -348,7 +348,36 @@ public class TestCounters {
         counterGroup.findCounter("Unknown");
     Assert.assertNull(count2);
   }
-  
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testTaskCounter() {
+    GroupFactory groupFactory = new GroupFactoryForTest();
+    FrameworkGroupFactory frameworkGroupFactory =
+        groupFactory.newFrameworkGroupFactory(TaskCounter.class);
+    Group group = (Group) frameworkGroupFactory.newGroup("TaskCounter");
+
+    FrameworkCounterGroup counterGroup =
+        (FrameworkCounterGroup) group.getUnderlyingGroup();
+
+    org.apache.hadoop.mapreduce.Counter count1 =
+        counterGroup.findCounter(
+            TaskCounter.PHYSICAL_MEMORY_BYTES.toString());
+    Assert.assertNotNull(count1);
+    count1.increment(10);
+    count1.increment(10);
+    Assert.assertEquals(20, count1.getValue());
+
+    // Verify no exception get thrown when finding an unknown counter
+    org.apache.hadoop.mapreduce.Counter count2 =
+        counterGroup.findCounter(
+            TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX.toString());
+    Assert.assertNotNull(count2);
+    count2.increment(5);
+    count2.increment(10);
+    Assert.assertEquals(10, count2.getValue());
+  }
+
   @Test
   public void testFilesystemCounter() {
     GroupFactory groupFactory = new GroupFactoryForTest();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
index 8b692ca..850c00a 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
@@ -40,10 +40,12 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -751,4 +753,190 @@ public class TestJobCounters {
     }
   }
 
+  /**
+   * Test mapper.
+   */
+  public static class TokenizerMapper extends
+      org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable> {
+
+    private final static IntWritable ONE = new IntWritable(1);
+    private Text word = new Text();
+
+    public void map(Object key, Text value, Context context)
+        throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.write(word, ONE);
+      }
+    }
+  }
+
+  /**
+   * Test reducer.
+   */
+  public static class IntSumReducer extends
+      org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, 
IntWritable>{
+    /**
+     * Test customer counter.
+     */
+    public enum Counters { MY_COUNTER_MAX }
+    private IntWritable result = new IntWritable();
+
+    public void reduce(Text key, Iterable<IntWritable> values, Context context)
+        throws IOException, InterruptedException {
+      int sum = 0;
+      for (IntWritable val : values) {
+        sum += val.get();
+      }
+      result.set(sum);
+      context.write(key, result);
+      context.getCounter(Counters.MY_COUNTER_MAX).increment(100);
+    }
+  }
+
+  /**
+   * Mock resource reporting.
+   */
+  public static class MockResourceCalculatorProcessTree
+      extends ResourceCalculatorProcessTree {
+
+    public MockResourceCalculatorProcessTree(String root) {
+      super(root);
+    }
+
+    @Override
+    public void updateProcessTree() {
+    }
+
+    @Override
+    public String getProcessTreeDump() {
+      return "";
+    }
+
+    @Override
+    public long getCumulativeCpuTime() {
+      return 0;
+    }
+
+    @Override
+    public boolean checkPidPgrpidForMatch() {
+      return true;
+    }
+
+    @Override
+    public long getRssMemorySize() {
+      return 1024;
+    }
+
+    @Override
+    public long getVirtualMemorySize() {
+      return 2000;
+    }
+
+    @Override
+    public float getCpuUsagePercent() {
+      return 0;
+    }
+  }
+
+  @Test
+  public void testMockResourceCalculatorProcessTree() {
+    ResourceCalculatorProcessTree tree;
+    tree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(
+        "1", TestJobCounters.MockResourceCalculatorProcessTree.class,
+        new Configuration());
+    assertNotNull(tree);
+  }
+
+  /**
+   * End to end test of maximum counters.
+   * @throws IOException test failed
+   * @throws ClassNotFoundException test failed
+   * @throws InterruptedException test failed
+   */
+  @Test
+  public void testMaxCounter()
+      throws IOException, ClassNotFoundException, InterruptedException {
+    // Create mapreduce cluster
+    MiniMRClientCluster mrCluster = MiniMRClientClusterFactory.create(
+        this.getClass(), 2, new Configuration());
+
+    try {
+      // Setup input and output paths
+      Path rootDir =
+          new Path(System.getProperty("test.build.data", "/tmp"));
+      Path testRootDir = new Path(rootDir, "testMaxCounter");
+      Path testInputDir = new Path(testRootDir, "input");
+      Path testOutputDir = new Path(testRootDir, "output");
+      FileSystem fs = FileSystem.getLocal(new Configuration());
+      fs.mkdirs(testInputDir);
+      Path testInputFile = new Path(testInputDir, "file01");
+      FSDataOutputStream stream =
+          fs.create(testInputFile);
+      stream.writeChars("foo");
+      stream.writeChars("bar");
+      stream.close();
+      fs.delete(testOutputDir, true);
+
+      // Run job (1 mapper, 2 reducers)
+      Configuration conf = new Configuration();
+      conf.setClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
+          MockResourceCalculatorProcessTree.class,
+          ResourceCalculatorProcessTree.class);
+      Job job = Job.getInstance(conf, "word count");
+      job.setJarByClass(WordCount.class);
+      job.setMapperClass(TokenizerMapper.class);
+      job.setCombinerClass(IntSumReducer.class);
+      job.setReducerClass(IntSumReducer.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(IntWritable.class);
+      job.setNumReduceTasks(2); // make sure we have double here to test max
+      org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+          .addInputPath(job, testInputDir);
+      org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+          .setOutputPath(job, testOutputDir);
+      assertTrue(job.waitForCompletion(true));
+
+      // Verify physical numbers
+      org.apache.hadoop.mapreduce.Counter maxMap =
+          job.getCounters().findCounter(
+              TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX);
+      org.apache.hadoop.mapreduce.Counter maxReduce =
+          job.getCounters().findCounter(
+              TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX);
+      org.apache.hadoop.mapreduce.Counter allP =
+          job.getCounters().findCounter(
+              TaskCounter.PHYSICAL_MEMORY_BYTES);
+      assertEquals(1024, maxMap.getValue());
+      assertEquals(1024, maxReduce.getValue());
+      assertEquals(3072, allP.getValue());
+
+      // Verify virtual numbers
+      org.apache.hadoop.mapreduce.Counter maxMapV =
+          job.getCounters().findCounter(
+              TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX);
+      org.apache.hadoop.mapreduce.Counter maxReduceV =
+          job.getCounters().findCounter(
+              TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX);
+      org.apache.hadoop.mapreduce.Counter allV =
+          job.getCounters().findCounter(
+              TaskCounter.VIRTUAL_MEMORY_BYTES);
+      assertEquals(2000, maxMapV.getValue());
+      assertEquals(2000, maxReduceV.getValue());
+      assertEquals(6000, allV.getValue());
+
+      // Make sure customer counters are not affected by the _MAX
+      // code in FrameworkCountersGroup
+      org.apache.hadoop.mapreduce.Counter customerCounter =
+          job.getCounters().findCounter(
+              IntSumReducer.Counters.MY_COUNTER_MAX);
+      assertEquals(200, customerCounter.getValue());
+
+      fs.delete(testInputDir, true);
+      fs.delete(testOutputDir, true);
+    } finally {
+      mrCluster.stop();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to