Author: acmurthy Date: Tue Sep 25 16:27:41 2012 New Revision: 1389970 URL: http://svn.apache.org/viewvc?rev=1389970&view=rev Log: Merge -c 1368724 from branch-1 to branch-1 to fix MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler.
Modified: hadoop/common/branches/branch-1.1/CHANGES.txt hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Modified: hadoop/common/branches/branch-1.1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1389970&r1=1389969&r2=1389970&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Sep 25 16:27:41 2012 @@ -163,6 +163,9 @@ Release 1.1.0 - 2012.09.16 HDFS-3617. Port HDFS-96 to branch-1 (support blocks greater than 2GB). (Patrick Kling and harsh via eli) + MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler. + (Todd Lipcon and Brandon Li via sseth) + BUG FIXES HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1389970&r1=1389969&r2=1389970&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Sep 25 16:27:41 2012 @@ -73,7 +73,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; import org.apache.hadoop.io.SecureIOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; @@ -341,6 +344,9 @@ public class TaskTracker implements MRCo "mapreduce.tasktracker.outofband.heartbeat.damper"; static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000; private volatile int oobHeartbeatDamper; + private boolean manageOsCacheInShuffle = false; + private int readaheadLength; + private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); // Track number of completed tasks to send an out-of-band heartbeat private AtomicInteger finishedCount = new AtomicInteger(0); @@ -881,6 +887,12 @@ public class TaskTracker implements MRCo oobHeartbeatDamper = fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER, DEFAULT_OOB_HEARTBEAT_DAMPER); + manageOsCacheInShuffle = fConf.getBoolean( + "mapreduce.shuffle.manage.os.cache", + true); + readaheadLength = fConf.getInt( + "mapreduce.shuffle.readahead.bytes", + 4 * 1024 * 1024); } private void startJettyBugMonitor() { @@ -3978,16 +3990,30 @@ public class TaskTracker implements MRCo * send it to the reducer. */ //open the map-output file + String filePath = mapOutputFileName.toUri().getPath(); mapOutputIn = SecureIOUtils.openForRead( - new File(mapOutputFileName.toUri().getPath()), runAsUserName); + new File(filePath), runAsUserName); + //new File(mapOutputFileName.toUri().getPath()), runAsUserName); + ReadaheadRequest curReadahead = null; + //seek to the correct offset for the reduce mapOutputIn.skip(info.startOffset); long rem = info.partLength; - int len = - mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ)); - while (rem > 0 && len >= 0) { + long offset = info.startOffset; + while (rem > 0) { + if (tracker.manageOsCacheInShuffle && tracker.readaheadPool != null) { + curReadahead = tracker.readaheadPool.readaheadStream(filePath, + mapOutputIn.getFD(), offset, tracker.readaheadLength, + info.startOffset + info.partLength, curReadahead); + } + int len = mapOutputIn.read(buffer, 0, + (int) Math.min(rem, MAX_BYTES_TO_READ)); + if (len < 0) { + break; + } rem -= len; + offset += len; try { shuffleMetrics.outputBytes(len); outStream.write(buffer, 0, len); @@ -3997,10 +4023,18 @@ public class TaskTracker implements MRCo throw ie; } totalRead += len; - len = - mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ)); } + if (curReadahead != null) { + curReadahead.cancel(); + } + + // drop cache if possible + if (tracker.manageOsCacheInShuffle && info.partLength > 0) { + NativeIO.posixFadviseIfPossible(mapOutputIn.getFD(), + info.startOffset, info.partLength, NativeIO.POSIX_FADV_DONTNEED); + } + if (LOG.isDebugEnabled()) { LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + " from map: " + mapId + " given " + info.partLength + "/" +