Author: amarrk
Date: Thu Apr 19 04:26:53 2012
New Revision: 1327816

URL: http://svn.apache.org/viewvc?rev=1327816&view=rev
Log:
MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for map 
only jobs. (amarrk)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
    
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
    
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1327816&r1=1327815&r2=1327816&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Apr 19 
04:26:53 2012
@@ -52,6 +52,9 @@ Trunk (unreleased changes)
 
   BUG FIXES
 
+    MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for 
+                    map only jobs. (amarrk)
+
     MAPREDUCE-4149. [Rumen] Rumen fails to parse certain counter
                     strings. (ravigummadi)
 

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java?rev=1327816&r1=1327815&r2=1327816&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
 Thu Apr 19 04:26:53 2012
@@ -85,10 +85,10 @@ class CompressionEmulationUtil {
     "gridmix.compression-emulation.map-output.compression-ratio";
   
   /**
-   * Configuration property for setting the compression ratio of reduce output.
+   * Configuration property for setting the compression ratio of job output.
    */
-  private static final String GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO = 
-    "gridmix.compression-emulation.reduce-output.compression-ratio";
+  private static final String GRIDMIX_JOB_OUTPUT_COMPRESSION_RATIO = 
+    "gridmix.compression-emulation.job-output.compression-ratio";
   
   /**
    * Default compression ratio.
@@ -434,20 +434,20 @@ class CompressionEmulationUtil {
   }
   
   /**
-   * Set the reduce output data compression ratio in the given configuration.
+   * Set the job output data compression ratio in the given configuration.
    */
-  static void setReduceOutputCompressionEmulationRatio(Configuration conf, 
-                                                       float ratio) {
-    conf.setFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, ratio);
+  static void setJobOutputCompressionEmulationRatio(Configuration conf, 
+                                                    float ratio) {
+    conf.setFloat(GRIDMIX_JOB_OUTPUT_COMPRESSION_RATIO, ratio);
   }
   
   /**
-   * Get the reduce output data compression ratio using the given 
configuration.
+   * Get the job output data compression ratio using the given configuration.
    * If the compression ratio is not set in the configuration then use the 
    * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
    */
-  static float getReduceOutputCompressionEmulationRatio(Configuration conf) {
-    return conf.getFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, 
+  static float getJobOutputCompressionEmulationRatio(Configuration conf) {
+    return conf.getFloat(GRIDMIX_JOB_OUTPUT_COMPRESSION_RATIO, 
                          DEFAULT_COMPRESSION_RATIO);
   }
   

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java?rev=1327816&r1=1327815&r2=1327816&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
 Thu Apr 19 04:26:53 2012
@@ -288,23 +288,23 @@ class LoadJob extends GridmixJob {
       final long[] reduceBytes = split.getOutputBytes();
       final long[] reduceRecords = split.getOutputRecords();
 
-      // enable gridmix map output record for compression
-      final boolean emulateMapOutputCompression = 
-        CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
-        && conf.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false);
-      float compressionRatio = 1.0f;
-      if (emulateMapOutputCompression) {
-        compressionRatio = 
-          CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf);
-        LOG.info("GridMix is configured to use a compression ratio of " 
-                 + compressionRatio + " for the map output data.");
-        key.setCompressibility(true, compressionRatio);
-        val.setCompressibility(true, compressionRatio);
-      }
-      
       long totalRecords = 0L;
       final int nReduces = ctxt.getNumReduceTasks();
       if (nReduces > 0) {
+        // enable gridmix map output record for compression
+        boolean emulateMapOutputCompression = 
+          CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
+          && conf.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false);
+        float compressionRatio = 1.0f;
+        if (emulateMapOutputCompression) {
+          compressionRatio = 
+            
CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf);
+          LOG.info("GridMix is configured to use a compression ratio of " 
+                   + compressionRatio + " for the map output data.");
+          key.setCompressibility(true, compressionRatio);
+          val.setCompressibility(true, compressionRatio);
+        }
+        
         int idx = 0;
         int id = split.getId();
         for (int i = 0; i < nReduces; ++i) {
@@ -332,7 +332,21 @@ class LoadJob extends GridmixJob {
         }
       } else {
         long mapOutputBytes = reduceBytes[0];
-        if (emulateMapOutputCompression) {
+        
+        // enable gridmix job output compression
+        boolean emulateJobOutputCompression = 
+          CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
+          && conf.getBoolean(FileOutputFormat.COMPRESS, false);
+
+        if (emulateJobOutputCompression) {
+          float compressionRatio = 
+            
CompressionEmulationUtil.getJobOutputCompressionEmulationRatio(conf);
+          LOG.info("GridMix is configured to use a compression ratio of " 
+                   + compressionRatio + " for the job output data.");
+          key.setCompressibility(true, compressionRatio);
+          val.setCompressibility(true, compressionRatio);
+
+          // set the output size accordingly
           mapOutputBytes /= compressionRatio;
         }
         reduces.add(new AvgRecordFactory(mapOutputBytes, reduceRecords[0],
@@ -387,9 +401,13 @@ class LoadJob extends GridmixJob {
     @Override
     public void cleanup(Context context) 
     throws IOException, InterruptedException {
+      LOG.info("Starting the cleanup phase.");
       for (RecordFactory factory : reduces) {
         key.setSeed(r.nextLong());
         while (factory.next(key, val)) {
+          // send the progress update (maybe make this a thread)
+          context.progress();
+          
           context.write(key, val);
           key.setSeed(r.nextLong());
           
@@ -462,7 +480,7 @@ class LoadJob extends GridmixJob {
           && FileOutputFormat.getCompressOutput(context)) {
         float compressionRatio = 
           CompressionEmulationUtil
-            .getReduceOutputCompressionEmulationRatio(conf);
+            .getJobOutputCompressionEmulationRatio(conf);
         LOG.info("GridMix is configured to use a compression ratio of " 
                  + compressionRatio + " for the reduce output data.");
         val.setCompressibility(true, compressionRatio);

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java?rev=1327816&r1=1327815&r2=1327816&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
 Thu Apr 19 04:26:53 2012
@@ -322,10 +322,9 @@ public class TestCompressionEmulationUti
   public void testOutputCompressionRatioConfiguration() throws Exception {
     Configuration conf = new Configuration();
     float ratio = 0.567F;
-    CompressionEmulationUtil.setReduceOutputCompressionEmulationRatio(conf, 
-                                                                      ratio);
+    CompressionEmulationUtil.setJobOutputCompressionEmulationRatio(conf, 
ratio);
     assertEquals(ratio, 
-        
CompressionEmulationUtil.getReduceOutputCompressionEmulationRatio(conf),
+        CompressionEmulationUtil.getJobOutputCompressionEmulationRatio(conf),
         0.0D);
   }
   


Reply via email to