Author: acmurthy
Date: Tue Sep 25 16:32:22 2012
New Revision: 1389977

URL: http://svn.apache.org/viewvc?rev=1389977&view=rev
Log:
Merge -c 1373672 from branch-1 to branch-1 to fix MAPREDUCE-4511. Add IFile 
readahead.

Modified:
    hadoop/common/branches/branch-1.1/CHANGES.txt
    hadoop/common/branches/branch-1.1/src/mapred/mapred-default.xml
    
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFile.java
    
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
    
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java

Modified: hadoop/common/branches/branch-1.1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1389977&r1=1389976&r2=1389977&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Sep 25 16:32:22 2012
@@ -166,6 +166,8 @@ Release 1.1.0 - 2012.09.16
     MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler.
     (Todd Lipcon and Brandon Li via sseth)
 
+    MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)
+
   BUG FIXES
 
     HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations

Modified: hadoop/common/branches/branch-1.1/src/mapred/mapred-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/mapred-default.xml?rev=1389977&r1=1389976&r2=1389977&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1.1/src/mapred/mapred-default.xml Tue Sep 25 
16:32:22 2012
@@ -972,7 +972,21 @@
     acceptable.
     </description>
   </property>
-  
+
+  <property>
+    <name>mapreduce.ifile.readahead</name>
+    <value>true</value>
+    <description>Configuration key to enable/disable IFile readahead.
+    </description>
+  </property>
+
+  <property>
+    <name>mapreduce.ifile.readahead.bytes</name>
+    <value>4194304</value>
+    <description>Configuration key to set the IFile readahead length in bytes.
+    </description>
+  </property>
+
 <!-- Job Notification Configuration -->
 
 <!--

Modified: 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=1389977&r1=1389976&r2=1389977&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFile.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFile.java
 Tue Sep 25 16:32:22 2012
@@ -291,7 +291,7 @@ class IFile {
                   CompressionCodec codec,
                   Counters.Counter readsCounter) throws IOException {
       readRecordsCounter = readsCounter;
-      checksumIn = new IFileInputStream(in,length);
+      checksumIn = new IFileInputStream(in,length, conf);
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
         this.in = codec.createInputStream(checksumIn, decompressor);

Modified: 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java?rev=1389977&r1=1389976&r2=1389977&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
 Tue Sep 25 16:32:22 2012
@@ -19,11 +19,20 @@
 package org.apache.hadoop.mapred;
 
 import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.HasFileDescriptor;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.util.DataChecksum;
 /**
  * A checksum input stream, used for IFiles.
@@ -32,7 +41,8 @@ import org.apache.hadoop.util.DataChecks
 
 class IFileInputStream extends InputStream {
   
-  private final InputStream in; //The input stream to be verified for 
checksum. 
+  private final InputStream in; //The input stream to be verified for checksum.
+  private final FileDescriptor inFd; // the file descriptor, if it is known
   private final long length; //The total length of the input file
   private final long dataLength;
   private DataChecksum sum;
@@ -40,19 +50,66 @@ class IFileInputStream extends InputStre
   private final byte b[] = new byte[1];
   private byte csum[] = null;
   private int checksumSize;
-  
+
+  private ReadaheadRequest curReadahead = null;
+  private ReadaheadPool raPool = ReadaheadPool.getInstance();
+  private boolean readahead;
+  private int readaheadLength;
+
+  /**
+   * Configuration key to enable/disable IFile readahead.
+   */
+  public static final String MAPRED_IFILE_READAHEAD =
+    "mapreduce.ifile.readahead";
+
+  public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+  /**
+   * Configuration key to set the IFile readahead length in bytes.
+   */
+  public static final String MAPRED_IFILE_READAHEAD_BYTES =
+    "mapreduce.ifile.readahead.bytes";
+
+  public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+    4 * 1024 * 1024;
+
+  public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
   /**
    * Create a checksum input stream that reads
    * @param in The input stream to be verified for checksum.
    * @param len The length of the input stream including checksum bytes.
    */
-  public IFileInputStream(InputStream in, long len) {
+  public IFileInputStream(InputStream in, long len, Configuration conf) {
     this.in = in;
+    this.inFd = getFileDescriptorIfAvail(in);
     sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
         Integer.MAX_VALUE);
     checksumSize = sum.getChecksumSize();
     length = len;
     dataLength = length - checksumSize;
+
+    conf = (conf != null) ? conf : new Configuration();
+    readahead = conf.getBoolean(MAPRED_IFILE_READAHEAD,
+        DEFAULT_MAPRED_IFILE_READAHEAD);
+    readaheadLength = conf.getInt(MAPRED_IFILE_READAHEAD_BYTES,
+        DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
+
+    doReadahead();
+  }
+
+  private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+    FileDescriptor fd = null;
+    try {
+      if (in instanceof HasFileDescriptor) {
+        fd = ((HasFileDescriptor)in).getFileDescriptor();
+      } else if (in instanceof FileInputStream) {
+        fd = ((FileInputStream)in).getFD();
+      }
+    } catch (IOException e) {
+      LOG.info("Unable to determine FileDescriptor", e);
+    }
+    return fd;
   }
 
   /**
@@ -61,6 +118,10 @@ class IFileInputStream extends InputStre
    */
   @Override
   public void close() throws IOException {
+
+    if (curReadahead != null) {
+      curReadahead.cancel();
+    }
     if (currentOffset < dataLength) {
       byte[] t = new byte[Math.min((int)
             (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
@@ -97,10 +158,21 @@ class IFileInputStream extends InputStre
     if (currentOffset >= dataLength) {
       return -1;
     }
-    
+
+    doReadahead();
+
     return doRead(b,off,len);
   }
 
+  private void doReadahead() {
+    if (raPool != null && inFd != null && readahead) {
+      curReadahead = raPool.readaheadStream(
+          "ifile", inFd,
+          currentOffset, readaheadLength, dataLength,
+          curReadahead);
+    }
+  }
+
   /**
    * Read bytes from the stream.
    * At EOF, checksum is validated and sent back

Modified: 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1389977&r1=1389976&r2=1389977&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
 Tue Sep 25 16:32:22 2012
@@ -1684,7 +1684,7 @@ class ReduceTask extends Task {
         }
 
         IFileInputStream checksumIn = 
-          new IFileInputStream(input,compressedLength);
+          new IFileInputStream(input,compressedLength, conf);
 
         input = checksumIn;       
       

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java?rev=1389977&r1=1389976&r2=1389977&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java
 Tue Sep 25 16:32:22 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -35,7 +36,7 @@ public class TestIFileStreams extends Te
     ifos.close();
     DataInputBuffer dib = new DataInputBuffer();
     dib.reset(dob.getData(), DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 104);
+    IFileInputStream ifis = new IFileInputStream(dib, 104, new 
Configuration());
     for (int i = 0; i < DLEN; ++i) {
       assertEquals(i, ifis.read());
     }
@@ -54,7 +55,7 @@ public class TestIFileStreams extends Te
     final byte[] b = dob.getData();
     ++b[17];
     dib.reset(b, DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 104);
+    IFileInputStream ifis = new IFileInputStream(dib, 104, new 
Configuration());
     int i = 0;
     try {
       while (i < DLEN) {
@@ -83,7 +84,7 @@ public class TestIFileStreams extends Te
     ifos.close();
     DataInputBuffer dib = new DataInputBuffer();
     dib.reset(dob.getData(), DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 100);
+    IFileInputStream ifis = new IFileInputStream(dib, 100, new 
Configuration());
     int i = 0;
     try {
       while (i < DLEN - 8) {


Reply via email to