Author: acmurthy
Date: Tue Sep 25 16:27:41 2012
New Revision: 1389970

URL: http://svn.apache.org/viewvc?rev=1389970&view=rev
Log:
Merge -c 1368724 from branch-1 to branch-1 to fix MAPREDUCE-3289. Make use of 
fadvise in the NM's shuffle handler.

Modified:
    hadoop/common/branches/branch-1.1/CHANGES.txt
    
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-1.1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1389970&r1=1389969&r2=1389970&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Sep 25 16:27:41 2012
@@ -163,6 +163,9 @@ Release 1.1.0 - 2012.09.16
     HDFS-3617. Port HDFS-96 to branch-1 (support blocks greater than 2GB).
     (Patrick Kling and harsh via eli)
 
+    MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler.
+    (Todd Lipcon and Brandon Li via sseth)
+
   BUG FIXES
 
     HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations

Modified: 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1389970&r1=1389969&r2=1389970&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
 Tue Sep 25 16:27:41 2012
@@ -73,7 +73,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -341,6 +344,9 @@ public class TaskTracker implements MRCo
     "mapreduce.tasktracker.outofband.heartbeat.damper";
   static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
   private volatile int oobHeartbeatDamper;
+  private boolean manageOsCacheInShuffle = false;
+  private int readaheadLength;
+  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
   
   // Track number of completed tasks to send an out-of-band heartbeat
   private AtomicInteger finishedCount = new AtomicInteger(0);
@@ -881,6 +887,12 @@ public class TaskTracker implements MRCo
     oobHeartbeatDamper = 
       fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER, 
           DEFAULT_OOB_HEARTBEAT_DAMPER);
+    manageOsCacheInShuffle = fConf.getBoolean(
+      "mapreduce.shuffle.manage.os.cache",
+      true);
+    readaheadLength = fConf.getInt(
+      "mapreduce.shuffle.readahead.bytes",
+      4 * 1024 * 1024);
   }
 
   private void startJettyBugMonitor() {
@@ -3978,16 +3990,30 @@ public class TaskTracker implements MRCo
          * send it to the reducer.
          */
         //open the map-output file
+        String filePath = mapOutputFileName.toUri().getPath();
         mapOutputIn = SecureIOUtils.openForRead(
-            new File(mapOutputFileName.toUri().getPath()), runAsUserName);
+            new File(filePath), runAsUserName);
+            //new File(mapOutputFileName.toUri().getPath()), runAsUserName);
 
+        ReadaheadRequest curReadahead = null;
+        
         //seek to the correct offset for the reduce
         mapOutputIn.skip(info.startOffset);
         long rem = info.partLength;
-        int len =
-          mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
-        while (rem > 0 && len >= 0) {
+        long offset = info.startOffset;
+        while (rem > 0) {
+          if (tracker.manageOsCacheInShuffle && tracker.readaheadPool != null) 
{
+            curReadahead = tracker.readaheadPool.readaheadStream(filePath,
+                mapOutputIn.getFD(), offset, tracker.readaheadLength,
+                info.startOffset + info.partLength, curReadahead);
+          }
+          int len = mapOutputIn.read(buffer, 0,
+              (int) Math.min(rem, MAX_BYTES_TO_READ));
+          if (len < 0) {
+            break;
+          }
           rem -= len;
+          offset += len;
           try {
             shuffleMetrics.outputBytes(len);
             outStream.write(buffer, 0, len);
@@ -3997,10 +4023,18 @@ public class TaskTracker implements MRCo
             throw ie;
           }
           totalRead += len;
-          len =
-            mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
         }
         
+        if (curReadahead != null) {
+          curReadahead.cancel();
+        }
+
+        // drop cache if possible
+        if (tracker.manageOsCacheInShuffle && info.partLength > 0) {
+          NativeIO.posixFadviseIfPossible(mapOutputIn.getFD(),
+              info.startOffset, info.partLength, NativeIO.POSIX_FADV_DONTNEED);
+        }
+
         if (LOG.isDebugEnabled()) {
           LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
                  " from map: " + mapId + " given " + info.partLength + "/" + 


Reply via email to