Author: acmurthy Date: Tue Sep 25 16:32:22 2012 New Revision: 1389977 URL: http://svn.apache.org/viewvc?rev=1389977&view=rev Log: Merge -c 1373672 from branch-1 to branch-1 to fix MAPREDUCE-4511. Add IFile readahead.
Modified: hadoop/common/branches/branch-1.1/CHANGES.txt hadoop/common/branches/branch-1.1/src/mapred/mapred-default.xml hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFile.java hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java Modified: hadoop/common/branches/branch-1.1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1389977&r1=1389976&r2=1389977&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Sep 25 16:32:22 2012 @@ -166,6 +166,8 @@ Release 1.1.0 - 2012.09.16 MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler. (Todd Lipcon and Brandon Li via sseth) + MAPREDUCE-4511. Add IFile readahead (ahmed via tucu) + BUG FIXES HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations Modified: hadoop/common/branches/branch-1.1/src/mapred/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/mapred-default.xml?rev=1389977&r1=1389976&r2=1389977&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/mapred-default.xml (original) +++ hadoop/common/branches/branch-1.1/src/mapred/mapred-default.xml Tue Sep 25 16:32:22 2012 @@ -972,7 +972,21 @@ acceptable. </description> </property> - + + <property> + <name>mapreduce.ifile.readahead</name> + <value>true</value> + <description>Configuration key to enable/disable IFile readahead. + </description> + </property> + + <property> + <name>mapreduce.ifile.readahead.bytes</name> + <value>4194304</value> + <description>Configuration key to set the IFile readahead length in bytes. + </description> + </property> + <!-- Job Notification Configuration --> <!-- Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=1389977&r1=1389976&r2=1389977&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFile.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFile.java Tue Sep 25 16:32:22 2012 @@ -291,7 +291,7 @@ class IFile { CompressionCodec codec, Counters.Counter readsCounter) throws IOException { readRecordsCounter = readsCounter; - checksumIn = new IFileInputStream(in,length); + checksumIn = new IFileInputStream(in,length, conf); if (codec != null) { decompressor = CodecPool.getDecompressor(codec); this.in = codec.createInputStream(checksumIn, decompressor); Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java?rev=1389977&r1=1389976&r2=1389977&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java Tue Sep 25 16:32:22 2012 @@ -19,11 +19,20 @@ package org.apache.hadoop.mapred; import java.io.EOFException; +import java.io.FileDescriptor; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.HasFileDescriptor; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; import org.apache.hadoop.util.DataChecksum; /** * A checksum input stream, used for IFiles. @@ -32,7 +41,8 @@ import org.apache.hadoop.util.DataChecks class IFileInputStream extends InputStream { - private final InputStream in; //The input stream to be verified for checksum. + private final InputStream in; //The input stream to be verified for checksum. + private final FileDescriptor inFd; // the file descriptor, if it is known private final long length; //The total length of the input file private final long dataLength; private DataChecksum sum; @@ -40,19 +50,66 @@ class IFileInputStream extends InputStre private final byte b[] = new byte[1]; private byte csum[] = null; private int checksumSize; - + + private ReadaheadRequest curReadahead = null; + private ReadaheadPool raPool = ReadaheadPool.getInstance(); + private boolean readahead; + private int readaheadLength; + + /** + * Configuration key to enable/disable IFile readahead. + */ + public static final String MAPRED_IFILE_READAHEAD = + "mapreduce.ifile.readahead"; + + public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true; + + /** + * Configuration key to set the IFile readahead length in bytes. + */ + public static final String MAPRED_IFILE_READAHEAD_BYTES = + "mapreduce.ifile.readahead.bytes"; + + public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES = + 4 * 1024 * 1024; + + public static final Log LOG = LogFactory.getLog(IFileInputStream.class); + /** * Create a checksum input stream that reads * @param in The input stream to be verified for checksum. * @param len The length of the input stream including checksum bytes. */ - public IFileInputStream(InputStream in, long len) { + public IFileInputStream(InputStream in, long len, Configuration conf) { this.in = in; + this.inFd = getFileDescriptorIfAvail(in); sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, Integer.MAX_VALUE); checksumSize = sum.getChecksumSize(); length = len; dataLength = length - checksumSize; + + conf = (conf != null) ? conf : new Configuration(); + readahead = conf.getBoolean(MAPRED_IFILE_READAHEAD, + DEFAULT_MAPRED_IFILE_READAHEAD); + readaheadLength = conf.getInt(MAPRED_IFILE_READAHEAD_BYTES, + DEFAULT_MAPRED_IFILE_READAHEAD_BYTES); + + doReadahead(); + } + + private static FileDescriptor getFileDescriptorIfAvail(InputStream in) { + FileDescriptor fd = null; + try { + if (in instanceof HasFileDescriptor) { + fd = ((HasFileDescriptor)in).getFileDescriptor(); + } else if (in instanceof FileInputStream) { + fd = ((FileInputStream)in).getFD(); + } + } catch (IOException e) { + LOG.info("Unable to determine FileDescriptor", e); + } + return fd; } /** @@ -61,6 +118,10 @@ class IFileInputStream extends InputStre */ @Override public void close() throws IOException { + + if (curReadahead != null) { + curReadahead.cancel(); + } if (currentOffset < dataLength) { byte[] t = new byte[Math.min((int) (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)]; @@ -97,10 +158,21 @@ class IFileInputStream extends InputStre if (currentOffset >= dataLength) { return -1; } - + + doReadahead(); + return doRead(b,off,len); } + private void doReadahead() { + if (raPool != null && inFd != null && readahead) { + curReadahead = raPool.readaheadStream( + "ifile", inFd, + currentOffset, readaheadLength, dataLength, + curReadahead); + } + } + /** * Read bytes from the stream. * At EOF, checksum is validated and sent back Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1389977&r1=1389976&r2=1389977&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Sep 25 16:32:22 2012 @@ -1684,7 +1684,7 @@ class ReduceTask extends Task { } IFileInputStream checksumIn = - new IFileInputStream(input,compressedLength); + new IFileInputStream(input,compressedLength, conf); input = checksumIn; Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java?rev=1389977&r1=1389976&r2=1389977&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java Tue Sep 25 16:32:22 2012 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -35,7 +36,7 @@ public class TestIFileStreams extends Te ifos.close(); DataInputBuffer dib = new DataInputBuffer(); dib.reset(dob.getData(), DLEN + 4); - IFileInputStream ifis = new IFileInputStream(dib, 104); + IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration()); for (int i = 0; i < DLEN; ++i) { assertEquals(i, ifis.read()); } @@ -54,7 +55,7 @@ public class TestIFileStreams extends Te final byte[] b = dob.getData(); ++b[17]; dib.reset(b, DLEN + 4); - IFileInputStream ifis = new IFileInputStream(dib, 104); + IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration()); int i = 0; try { while (i < DLEN) { @@ -83,7 +84,7 @@ public class TestIFileStreams extends Te ifos.close(); DataInputBuffer dib = new DataInputBuffer(); dib.reset(dob.getData(), DLEN + 4); - IFileInputStream ifis = new IFileInputStream(dib, 100); + IFileInputStream ifis = new IFileInputStream(dib, 100, new Configuration()); int i = 0; try { while (i < DLEN - 8) {