Author: cnauroth
Date: Mon Mar 24 18:57:52 2014
New Revision: 1580994

URL: http://svn.apache.org/r1580994
Log:
MAPREDUCE-5791. Shuffle phase is slow in Windows - 
FadviseFileRegion::transferTo does not read disks efficiently. Contributed by 
Nikola Vujic.

Added:
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1580994&r1=1580993&r2=1580994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Mar 24 
18:57:52 2014
@@ -259,6 +259,10 @@ Release 2.4.0 - UNRELEASED
     override HADOOP_ROOT_LOGGER or HADOOP_CLIENT_OPTS. (Varun Vasudev via
     vinodkv)
 
+    MAPREDUCE-5791. Shuffle phase is slow in Windows -
+    FadviseFileRegion::transferTo does not read disks efficiently.
+    (Nikola Vujic via cnauroth)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1580994&r1=1580993&r2=1580994&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 Mon Mar 24 18:57:52 2014
@@ -343,6 +343,30 @@
 </property>
 
 <property>
+  <name>mapreduce.shuffle.transferTo.allowed</name>
+  <value></value>
+  <description>This option can enable/disable using nio transferTo method in 
+  the shuffle phase. NIO transferTo does not perform well on windows in the 
+  shuffle phase. Thus, with this configuration property it is possible to 
+  disable it, in which case custom transfer method will be used. Recommended 
+  value is false when running Hadoop on Windows. For Linux, it is recommended 
+  to set it to true. If nothing is set then the default value is false for 
+  Windows, and true for Linux.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.shuffle.transfer.buffer.size</name>
+  <value>131072</value>
+  <description>This property is used only if 
+  mapreduce.shuffle.transferTo.allowed is set to false. In that case, 
+  this property defines the size of the buffer used in the buffer copy code
+  for the shuffle phase. The size of this buffer determines the size of the IO
+  requests.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.reduce.markreset.buffer.percent</name>
   <value>0.0</value>
   <description>The percentage of memory -relative to the maximum heap size- to

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java?rev=1580994&r1=1580993&r2=1580994&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
 Mon Mar 24 18:57:52 2014
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapred;
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
 
 import org.apache.commons.logging.Log;
@@ -30,6 +32,8 @@ import org.apache.hadoop.io.ReadaheadPoo
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.jboss.netty.channel.DefaultFileRegion;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class FadvisedFileRegion extends DefaultFileRegion {
 
   private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
@@ -39,18 +43,29 @@ public class FadvisedFileRegion extends 
   private final ReadaheadPool readaheadPool;
   private final FileDescriptor fd;
   private final String identifier;
-
+  private final long count;
+  private final long position;
+  private final int shuffleBufferSize;
+  private final boolean shuffleTransferToAllowed;
+  private final FileChannel fileChannel;
+  
   private ReadaheadRequest readaheadRequest;
 
   public FadvisedFileRegion(RandomAccessFile file, long position, long count,
       boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
-      String identifier) throws IOException {
+      String identifier, int shuffleBufferSize, 
+      boolean shuffleTransferToAllowed) throws IOException {
     super(file.getChannel(), position, count);
     this.manageOsCache = manageOsCache;
     this.readaheadLength = readaheadLength;
     this.readaheadPool = readaheadPool;
     this.fd = file.getFD();
     this.identifier = identifier;
+    this.fileChannel = file.getChannel();
+    this.count = count;
+    this.position = position;
+    this.shuffleBufferSize = shuffleBufferSize;
+    this.shuffleTransferToAllowed = shuffleTransferToAllowed;
   }
 
   @Override
@@ -61,9 +76,69 @@ public class FadvisedFileRegion extends 
           getPosition() + position, readaheadLength,
           getPosition() + getCount(), readaheadRequest);
     }
-    return super.transferTo(target, position);
+    
+    if(this.shuffleTransferToAllowed) {
+      return super.transferTo(target, position);
+    } else {
+      return customShuffleTransfer(target, position);
+    } 
+  }
+
+  /**
+   * This method transfers data using local buffer. It transfers data from 
+   * a disk to a local buffer in memory, and then it transfers data from the 
+   * buffer to the target. This is used only if transferTo is disallowed in
+   * the configuration file. super.TransferTo does not perform well on Windows 
+   * due to a small IO request generated. customShuffleTransfer can control 
+   * the size of the IO requests by changing the size of the intermediate 
+   * buffer.
+   */
+  @VisibleForTesting
+  long customShuffleTransfer(WritableByteChannel target, long position)
+      throws IOException {
+    long actualCount = this.count - position;
+    if (actualCount < 0 || position < 0) {
+      throw new IllegalArgumentException(
+          "position out of range: " + position +
+          " (expected: 0 - " + (this.count - 1) + ')');
+    }
+    if (actualCount == 0) {
+      return 0L;
+    }
+    
+    long trans = actualCount;
+    int readSize;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+    
+    while(trans > 0L &&
+        (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) 
{
+      //adjust counters and buffer limit
+      if(readSize < trans) {
+        trans -= readSize;
+        position += readSize;
+        byteBuffer.flip();
+      } else {
+        //We can read more than we need if the actualCount is not multiple 
+        //of the byteBuffer size and file is big enough. In that case we cannot
+        //use flip method but we need to set buffer limit manually to trans.
+        byteBuffer.limit((int)trans);
+        byteBuffer.position(0);
+        position += trans; 
+        trans = 0;
+      }
+      
+      //write data to the target
+      while(byteBuffer.hasRemaining()) {
+        target.write(byteBuffer);
+      }
+      
+      byteBuffer.clear();
+    }
+    
+    return actualCount - trans;
   }
 
+  
   @Override
   public void releaseExternalResources() {
     if (readaheadRequest != null) {

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1580994&r1=1580993&r2=1580994&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
 Mon Mar 24 18:57:52 2014
@@ -74,6 +74,7 @@ import org.apache.hadoop.metrics2.lib.Mu
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
@@ -144,6 +145,8 @@ public class ShuffleHandler extends Auxi
   private boolean manageOsCache;
   private int readaheadLength;
   private int maxShuffleConnections;
+  private int shuffleBufferSize;
+  private boolean shuffleTransferToAllowed;
   private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
 
   public static final String MAPREDUCE_SHUFFLE_SERVICEID =
@@ -183,6 +186,17 @@ public class ShuffleHandler extends Auxi
   public static final String MAX_SHUFFLE_THREADS = 
"mapreduce.shuffle.max.threads";
   // 0 implies Netty default of 2 * number of available processors
   public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
+  
+  public static final String SHUFFLE_BUFFER_SIZE = 
+      "mapreduce.shuffle.transfer.buffer.size";
+  public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+  
+  public static final String  SHUFFLE_TRANSFERTO_ALLOWED = 
+      "mapreduce.shuffle.transferTo.allowed";
+  public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
+  public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = 
+      false;
+
   boolean connectionKeepAliveEnabled = false;
   int connectionKeepAliveTimeOut;
   int mapOutputMetaInfoCacheSize;
@@ -310,6 +324,13 @@ public class ShuffleHandler extends Auxi
     if (maxShuffleThreads == 0) {
       maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
     }
+    
+    shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE, 
+                                    DEFAULT_SHUFFLE_BUFFER_SIZE);
+        
+    shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
+         (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
+                         DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
 
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
       .setNameFormat("ShuffleHandler Netty Boss #%d")
@@ -746,7 +767,8 @@ public class ShuffleHandler extends Auxi
       if (ch.getPipeline().get(SslHandler.class) == null) {
         final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
             info.startOffset, info.partLength, manageOsCache, readaheadLength,
-            readaheadPool, spillfile.getAbsolutePath());
+            readaheadPool, spillfile.getAbsolutePath(), 
+            shuffleBufferSize, shuffleTransferToAllowed);
         writeFuture = ch.write(partition);
         writeFuture.addListener(new ChannelFutureListener() {
             // TODO error handling; distinguish IO/connection failures,

Added: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java?rev=1580994&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java
 (added)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java
 Mon Mar 24 18:57:52 2014
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.WritableByteChannel;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFadvisedFileRegion {
+  private final int FILE_SIZE = 16*1024*1024;
+  private static final Log LOG = 
+      LogFactory.getLog(TestFadvisedFileRegion.class);
+  
+  @Test(timeout = 100000)
+  public void testCustomShuffleTransfer() throws IOException {
+    File absLogDir = new File("target", 
+        TestFadvisedFileRegion.class.getSimpleName() + 
+        "LocDir").getAbsoluteFile();
+    
+    String testDirPath =
+        StringUtils.join(Path.SEPARATOR,
+            new String[] { absLogDir.getAbsolutePath(),
+                "testCustomShuffleTransfer"});
+    File testDir = new File(testDirPath);
+    testDir.mkdirs();
+    
+    System.out.println(testDir.getAbsolutePath());
+    
+    File inFile = new File(testDir, "fileIn.out");
+    File outFile = new File(testDir, "fileOut.out");
+    
+    
+    //Initialize input file
+    byte [] initBuff = new byte[FILE_SIZE];
+    Random rand = new Random();
+    rand.nextBytes(initBuff);
+    
+    FileOutputStream out = new FileOutputStream(inFile);
+    try{
+      out.write(initBuff);  
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
+    
+    
+    //define position and count to read from a file region.
+    int position = 2*1024*1024;
+    int count = 4*1024*1024 - 1;
+    
+    RandomAccessFile inputFile = null;
+    RandomAccessFile targetFile = null;
+    WritableByteChannel target = null;
+    FadvisedFileRegion fileRegion = null;
+    
+    try {
+      inputFile = new RandomAccessFile(inFile.getAbsolutePath(), "r");
+      targetFile = new RandomAccessFile(outFile.getAbsolutePath(), "rw");
+      target = targetFile.getChannel();
+      
+      Assert.assertEquals(FILE_SIZE, inputFile.length());
+      
+      //create FadvisedFileRegion
+      fileRegion = new FadvisedFileRegion(
+          inputFile, position, count, false, 0, null, null, 1024, false);
+      
+      //test corner cases
+      customShuffleTransferCornerCases(fileRegion, target, count);
+            
+      long pos = 0;
+      long size;
+      while((size = fileRegion.customShuffleTransfer(target, pos)) > 0) {
+        pos += size; 
+      }
+    
+      //assert size
+      Assert.assertEquals(count, (int)pos);
+      Assert.assertEquals(count, targetFile.length());
+    } finally {
+      if (fileRegion != null) {
+        fileRegion.releaseExternalResources();
+      }
+      IOUtils.cleanup(LOG, target);
+      IOUtils.cleanup(LOG, targetFile);
+      IOUtils.cleanup(LOG, inputFile);
+    }
+    
+    //Read the target file and verify that copy is done correctly
+    byte [] buff = new byte[FILE_SIZE];
+    FileInputStream in = new FileInputStream(outFile);
+    try {
+      int total = in.read(buff, 0, count);
+    
+      Assert.assertEquals(count, total);
+    
+      for(int i = 0; i < count; i++) {
+        Assert.assertEquals(initBuff[position+i], buff[i]);
+      }
+    } finally {
+      IOUtils.cleanup(LOG, in);
+    }
+    
+    //delete files and folders
+    inFile.delete();
+    outFile.delete();
+    testDir.delete();
+    absLogDir.delete();
+  }
+  
+  private static void customShuffleTransferCornerCases(
+      FadvisedFileRegion fileRegion, WritableByteChannel target, int count) {
+    try {
+      fileRegion.customShuffleTransfer(target, -1);
+      Assert.fail("Expected a IllegalArgumentException");
+    } catch (IllegalArgumentException ie) {
+      LOG.info("Expected - illegal argument is passed.");
+    } catch (Exception e) {
+      Assert.fail("Expected a IllegalArgumentException");
+    }
+
+    //test corner cases
+    try {
+      fileRegion.customShuffleTransfer(target, count + 1);
+      Assert.fail("Expected a IllegalArgumentException");
+    } catch (IllegalArgumentException ie) {
+      LOG.info("Expected - illegal argument is passed.");
+    } catch (Exception e) {
+      Assert.fail("Expected a IllegalArgumentException");
+    }
+  }
+}


Reply via email to