Author: vinodkv
Date: Fri Mar 21 21:44:24 2014
New Revision: 1580063

URL: http://svn.apache.org/r1580063
Log:
MAPREDUCE-5787. Added the ability to keep alive shuffle connections in the 
MapReduce shuffle-handler. Contributed by Rajesh Balamohan.
svn merge --ignore-ancestry -c 1580062 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1580063&r1=1580062&r2=1580063&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri 
Mar 21 21:44:24 2014
@@ -34,6 +34,9 @@ Release 2.4.0 - UNRELEASED
 
   NEW FEATURES
 
+    MAPREDUCE-5787. Added the ability to keep alive shuffle connections in the
+    MapReduce shuffle-handler. (Rajesh Balamohan via vinodkv)
+
   IMPROVEMENTS
 
     MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1580063&r1=1580062&r2=1580063&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 Fri Mar 21 21:44:24 2014
@@ -363,6 +363,21 @@
 </property>
 
 <property>
+  <name>mapreduce.shuffle.connection-keep-alive.enable</name>
+  <value>false</value>
+  <description>set to true to support keep-alive connections.</description>
+</property>
+
+<property>
+  <name>mapreduce.shuffle.connection-keep-alive.timeout</name>
+  <value>5</value>
+  <description>The number of seconds a shuffle client attempts to retain
+   http connection. Refer "Keep-Alive: timeout=" header in
+   Http specification
+  </description>
+</property>
+
+<property>
   <name>mapreduce.task.timeout</name>
   <value>600000</value>
   <description>The number of milliseconds before a task will be

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1580063&r1=1580062&r2=1580063&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
 Fri Mar 21 21:44:24 2014
@@ -23,7 +23,6 @@ import static org.jboss.netty.handler.co
 import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
 import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
 import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED;
 import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
 import static 
org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
 import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
@@ -41,6 +40,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -110,6 +110,7 @@ import org.jboss.netty.handler.codec.htt
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 import org.jboss.netty.util.CharsetUtil;
+import org.mortbay.jetty.HttpHeaders;
 
 import com.google.common.base.Charsets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -156,6 +157,21 @@ public class ShuffleHandler extends Auxi
   public static final String SHUFFLE_PORT_CONFIG_KEY = 
"mapreduce.shuffle.port";
   public static final int DEFAULT_SHUFFLE_PORT = 13562;
 
+  public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED =
+      "mapreduce.shuffle.connection-keep-alive.enable";
+  public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = 
false;
+
+  public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT =
+      "mapreduce.shuffle.connection-keep-alive.timeout";
+  public static final int DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = 5; 
//seconds
+
+  public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
+      "mapreduce.shuffle.mapoutput-info.meta.cache.size";
+  public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
+      1000;
+
+  public static final String CONNECTION_CLOSE = "close";
+
   public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
     "mapreduce.shuffle.ssl.file.buffer.size";
 
@@ -167,6 +183,9 @@ public class ShuffleHandler extends Auxi
   public static final String MAX_SHUFFLE_THREADS = 
"mapreduce.shuffle.max.threads";
   // 0 implies Netty default of 2 * number of available processors
   public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
+  boolean connectionKeepAliveEnabled = false;
+  int connectionKeepAliveTimeOut;
+  int mapOutputMetaInfoCacheSize;
 
   @Metrics(about="Shuffle output metrics", context="mapred")
   static class ShuffleMetrics implements ChannelFutureListener {
@@ -328,6 +347,15 @@ public class ShuffleHandler extends Auxi
 
     sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
                                     DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+    connectionKeepAliveEnabled =
+        conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
+          DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
+    connectionKeepAliveTimeOut =
+        Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
+          DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT));
+    mapOutputMetaInfoCacheSize =
+        Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE,
+          DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
   }
 
   @Override
@@ -459,6 +487,15 @@ public class ShuffleHandler extends Auxi
       }
       final Map<String,List<String>> q =
         new QueryStringDecoder(request.getUri()).getParameters();
+      final List<String> keepAliveList = q.get("keepAlive");
+      boolean keepAliveParam = false;
+      if (keepAliveList != null && keepAliveList.size() == 1) {
+        keepAliveParam = Boolean.valueOf(keepAliveList.get(0));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("KeepAliveParam : " + keepAliveList
+            + " : " + keepAliveParam);
+        }
+      }
       final List<String> mapIds = splitMaps(q.get("map"));
       final List<String> reduceQ = q.get("reduce");
       final List<String> jobQ = q.get("job");
@@ -466,7 +503,8 @@ public class ShuffleHandler extends Auxi
         LOG.debug("RECV: " + request.getUri() +
             "\n  mapId: " + mapIds +
             "\n  reduceId: " + reduceQ +
-            "\n  jobId: " + jobQ);
+            "\n  jobId: " + jobQ +
+            "\n  keepAlive: " + keepAliveParam);
       }
 
       if (mapIds == null || reduceQ == null || jobQ == null) {
@@ -505,27 +543,46 @@ public class ShuffleHandler extends Auxi
         return;
       }
 
+      Map<String, MapOutputInfo> mapOutputInfoMap =
+          new HashMap<String, MapOutputInfo>();
       Channel ch = evt.getChannel();
+      String user = userRsrc.get(jobId);
+
+      // $x/$user/appcache/$appId/output/$mapId
+      // TODO: Once Shuffle is out of NM, this can use MR APIs to convert
+      // between App and Job
+      String outputBasePathStr = getBaseLocation(jobId, user);
+
+      try {
+        populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
+          response, keepAliveParam, mapOutputInfoMap);
+      } catch(IOException e) {
+        ch.write(response);
+        LOG.error("Shuffle error in populating headers :", e);
+        String errorMessage = getErrorMessage(e);
+        sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
+        return;
+      }
       ch.write(response);
       // TODO refactor the following into the pipeline
       ChannelFuture lastMap = null;
       for (String mapId : mapIds) {
         try {
+          MapOutputInfo info = mapOutputInfoMap.get(mapId);
+          if (info == null) {
+            info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user);
+          }
           lastMap =
-            sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, 
reduceId);
+              sendMapOutput(ctx, ch, user, mapId,
+                reduceId, info);
           if (null == lastMap) {
             sendError(ctx, NOT_FOUND);
             return;
           }
         } catch (IOException e) {
           LOG.error("Shuffle error :", e);
-          StringBuffer sb = new StringBuffer(e.getMessage());
-          Throwable t = e;
-          while (t.getCause() != null) {
-            sb.append(t.getCause().getMessage());
-            t = t.getCause();
-          }
-          sendError(ctx,sb.toString() , INTERNAL_SERVER_ERROR);
+          String errorMessage = getErrorMessage(e);
+          sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
           return;
         }
       }
@@ -533,6 +590,99 @@ public class ShuffleHandler extends Auxi
       lastMap.addListener(ChannelFutureListener.CLOSE);
     }
 
+    private String getErrorMessage(Throwable t) {
+      StringBuffer sb = new StringBuffer(t.getMessage());
+      while (t.getCause() != null) {
+        sb.append(t.getCause().getMessage());
+        t = t.getCause();
+      }
+      return sb.toString();
+    }
+
+    private String getBaseLocation(String jobId, String user) {
+      final JobID jobID = JobID.forName(jobId);
+      final ApplicationId appID =
+          ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()),
+            jobID.getId());
+      final String baseStr =
+          ContainerLocalizer.USERCACHE + "/" + user + "/"
+              + ContainerLocalizer.APPCACHE + "/"
+              + ConverterUtils.toString(appID) + "/output" + "/";
+      return baseStr;
+    }
+
+    protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+        int reduce, String user) throws IOException {
+      // Index file
+      Path indexFileName =
+          lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
+      IndexRecord info =
+          indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+
+      Path mapOutputFileName =
+          lDirAlloc.getLocalPathToRead(base + "/file.out", conf);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName);
+      }
+      MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info);
+      return outputInfo;
+    }
+
+    protected void populateHeaders(List<String> mapIds, String outputBaseStr,
+        String user, int reduce, HttpRequest request, HttpResponse response,
+        boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
+        throws IOException {
+
+      long contentLength = 0;
+      for (String mapId : mapIds) {
+        String base = outputBaseStr + mapId;
+        MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user);
+        if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
+          mapOutputInfoMap.put(mapId, outputInfo);
+        }
+        // Index file
+        Path indexFileName =
+            lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
+        IndexRecord info =
+            indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+        ShuffleHeader header =
+            new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
+        DataOutputBuffer dob = new DataOutputBuffer();
+        header.write(dob);
+
+        contentLength += info.partLength;
+        contentLength += dob.getLength();
+      }
+
+      // Now set the response headers.
+      setResponseHeaders(response, keepAliveParam, contentLength);
+    }
+
+    protected void setResponseHeaders(HttpResponse response,
+        boolean keepAliveParam, long contentLength) {
+      if (!connectionKeepAliveEnabled && !keepAliveParam) {
+        LOG.info("Setting connection close header...");
+        response.setHeader(HttpHeaders.CONNECTION, CONNECTION_CLOSE);
+      } else {
+        response.setHeader(HttpHeaders.CONTENT_LENGTH,
+          String.valueOf(contentLength));
+        response.setHeader(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE);
+        response.setHeader(HttpHeaders.KEEP_ALIVE, "timeout="
+            + connectionKeepAliveTimeOut);
+        LOG.info("Content Length in shuffle : " + contentLength);
+      }
+    }
+
+    class MapOutputInfo {
+      final Path mapOutputFileName;
+      final IndexRecord indexRecord;
+
+      MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) {
+        this.mapOutputFileName = mapOutputFileName;
+        this.indexRecord = indexRecord;
+      }
+    }
+
     protected void verifyRequest(String appid, ChannelHandlerContext ctx,
         HttpRequest request, HttpResponse response, URL requestUri)
         throws IOException {
@@ -575,39 +725,16 @@ public class ShuffleHandler extends Auxi
     }
 
     protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel 
ch,
-        String user, String jobId, String mapId, int reduce)
+        String user, String mapId, int reduce, MapOutputInfo mapOutputInfo)
         throws IOException {
-      // TODO replace w/ rsrc alloc
-      // $x/$user/appcache/$appId/output/$mapId
-      // TODO: Once Shuffle is out of NM, this can use MR APIs to convert 
between App and Job
-      JobID jobID = JobID.forName(jobId);
-      ApplicationId appID = ApplicationId.newInstance(
-          Long.parseLong(jobID.getJtIdentifier()), jobID.getId());
-      final String base =
-          ContainerLocalizer.USERCACHE + "/" + user + "/"
-              + ContainerLocalizer.APPCACHE + "/"
-              + ConverterUtils.toString(appID) + "/output" + "/" + mapId;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("DEBUG0 " + base);
-      }
-      // Index file
-      Path indexFileName = lDirAlloc.getLocalPathToRead(
-          base + "/file.out.index", conf);
-      // Map-output file
-      Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
-          base + "/file.out", conf);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : "
-            + indexFileName);
-      }
-      final IndexRecord info = 
-        indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+      final IndexRecord info = mapOutputInfo.indexRecord;
       final ShuffleHeader header =
         new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
       final DataOutputBuffer dob = new DataOutputBuffer();
       header.write(dob);
       ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-      final File spillfile = new File(mapOutputFileName.toString());
+      final File spillfile =
+          new File(mapOutputInfo.mapOutputFileName.toString());
       RandomAccessFile spill;
       try {
         spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null);

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1580063&r1=1580062&r2=1580063&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
 Fri Mar 21 21:44:24 2014
@@ -23,6 +23,8 @@ import static org.apache.hadoop.test.Met
 import static org.apache.hadoop.test.MockitoMaker.make;
 import static org.apache.hadoop.test.MockitoMaker.stub;
 import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assume.assumeTrue;
 
@@ -39,6 +41,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.zip.CheckedOutputStream;
 import java.util.zip.Checksum;
 
@@ -69,17 +72,24 @@ import org.apache.hadoop.yarn.server.nod
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
 import org.jboss.netty.handler.codec.http.HttpRequest;
 import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.junit.Assert;
 import org.junit.Test;
-
+import org.mortbay.jetty.HttpHeaders;
 
 public class TestShuffleHandler {
   static final long MiB = 1024 * 1024; 
   private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
 
+  /**
+   * Test the validation of ShuffleHandler's meta-data's serialization and
+   * de-serialization.
+   *
+   * @throws Exception exception
+   */
   @Test (timeout = 10000)
   public void testSerializeMeta()  throws Exception {
     assertEquals(1, ShuffleHandler.deserializeMetaData(
@@ -90,6 +100,11 @@ public class TestShuffleHandler {
         ShuffleHandler.serializeMetaData(8080)));
   }
 
+  /**
+   * Validate shuffle connection and input/output metrics.
+   *
+   * @throws Exception exception
+   */
   @Test (timeout = 10000)
   public void testShuffleMetrics() throws Exception {
     MetricsSystem ms = new MetricsSystemImpl();
@@ -120,6 +135,11 @@ public class TestShuffleHandler {
     assertGauge("ShuffleConnections", connections, rb);
   }
 
+  /**
+   * Verify client prematurely closing a connection.
+   *
+   * @throws Exception exception.
+   */
   @Test (timeout = 10000)
   public void testClientClosesConnection() throws Exception {
     final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
@@ -131,13 +151,28 @@ public class TestShuffleHandler {
         // replace the shuffle handler with one stubbed for testing
         return new Shuffle(conf) {
           @Override
+          protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+              int reduce, String user) throws IOException {
+            return null;
+          }
+          @Override
+          protected void populateHeaders(List<String> mapIds, String jobId,
+              String user, int reduce, HttpRequest request,
+              HttpResponse response, boolean keepAliveParam,
+              Map<String, MapOutputInfo> infoMap) throws IOException {
+            // Only set response headers and skip everything else
+            // send some dummy value for content-length
+            super.setResponseHeaders(response, keepAliveParam, 100);
+          }
+          @Override
           protected void verifyRequest(String appid, ChannelHandlerContext ctx,
               HttpRequest request, HttpResponse response, URL requestUri)
                   throws IOException {
           }
           @Override
           protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-              Channel ch, String user, String jobId, String mapId, int reduce)
+              Channel ch, String user, String mapId, int reduce,
+              MapOutputInfo info)
                   throws IOException {
             // send a shuffle header and a lot of data down the channel
             // to trigger a broken pipe
@@ -147,7 +182,7 @@ public class TestShuffleHandler {
             header.write(dob);
             ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
             dob = new DataOutputBuffer();
-            for (int i=0; i<100000; ++i) {
+            for (int i = 0; i < 100000; ++i) {
               header.write(dob);
             }
             return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
@@ -187,6 +222,7 @@ public class TestShuffleHandler {
     conn.connect();
     DataInputStream input = new DataInputStream(conn.getInputStream());
     Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+    Assert.assertEquals("close", conn.getHeaderField(HttpHeaders.CONNECTION));
     ShuffleHeader header = new ShuffleHeader();
     header.readFields(input);
     input.close();
@@ -196,6 +232,147 @@ public class TestShuffleHandler {
         failures.size() == 0);
   }
 
+  @Test(timeout = 10000)
+  public void testKeepAlive() throws Exception {
+    final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, 
true);
+    // try setting to -ve keep alive timeout.
+    conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
+    ShuffleHandler shuffleHandler = new ShuffleHandler() {
+      @Override
+      protected Shuffle getShuffle(final Configuration conf) {
+        // replace the shuffle handler with one stubbed for testing
+        return new Shuffle(conf) {
+          @Override
+          protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+              int reduce, String user) throws IOException {
+            return null;
+          }
+          @Override
+          protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+              HttpRequest request, HttpResponse response, URL requestUri)
+              throws IOException {
+          }
+
+          @Override
+          protected void populateHeaders(List<String> mapIds, String jobId,
+              String user, int reduce, HttpRequest request,
+              HttpResponse response, boolean keepAliveParam,
+              Map<String, MapOutputInfo> infoMap) throws IOException {
+            // Send some dummy data (populate content length details)
+            ShuffleHeader header =
+                new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+            DataOutputBuffer dob = new DataOutputBuffer();
+            header.write(dob);
+            dob = new DataOutputBuffer();
+            for (int i = 0; i < 100000; ++i) {
+              header.write(dob);
+            }
+
+            long contentLength = dob.getLength();
+            // for testing purpose;
+            // disable connectinKeepAliveEnabled if keepAliveParam is available
+            if (keepAliveParam) {
+              connectionKeepAliveEnabled = false;
+            }
+
+            super.setResponseHeaders(response, keepAliveParam, contentLength);
+          }
+
+          @Override
+          protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+              Channel ch, String user, String mapId, int reduce,
+              MapOutputInfo info) throws IOException {
+            HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+
+            // send a shuffle header and a lot of data down the channel
+            // to trigger a broken pipe
+            ShuffleHeader header =
+                new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+            DataOutputBuffer dob = new DataOutputBuffer();
+            header.write(dob);
+            ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            dob = new DataOutputBuffer();
+            for (int i = 0; i < 100000; ++i) {
+              header.write(dob);
+            }
+            return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+          }
+
+          @Override
+          protected void sendError(ChannelHandlerContext ctx,
+              HttpResponseStatus status) {
+            if (failures.size() == 0) {
+              failures.add(new Error());
+              ctx.getChannel().close();
+            }
+          }
+
+          @Override
+          protected void sendError(ChannelHandlerContext ctx, String message,
+              HttpResponseStatus status) {
+            if (failures.size() == 0) {
+              failures.add(new Error());
+              ctx.getChannel().close();
+            }
+          }
+        };
+      }
+    };
+    shuffleHandler.init(conf);
+    shuffleHandler.start();
+
+    String shuffleBaseURL = "http://127.0.0.1:";
+            + shuffleHandler.getConfig().get(
+              ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
+    URL url =
+        new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+            + "map=attempt_12345_1_m_1_0");
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+      ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+      ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    conn.connect();
+    DataInputStream input = new DataInputStream(conn.getInputStream());
+    Assert.assertEquals(HttpHeaders.KEEP_ALIVE,
+      conn.getHeaderField(HttpHeaders.CONNECTION));
+    Assert.assertEquals("timeout=1",
+      conn.getHeaderField(HttpHeaders.KEEP_ALIVE));
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+    ShuffleHeader header = new ShuffleHeader();
+    header.readFields(input);
+    input.close();
+
+    // For keepAlive via URL
+    url =
+        new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+            + "map=attempt_12345_1_m_1_0&keepAlive=true");
+    conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+      ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+      ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    conn.connect();
+    input = new DataInputStream(conn.getInputStream());
+    Assert.assertEquals(HttpHeaders.KEEP_ALIVE,
+      conn.getHeaderField(HttpHeaders.CONNECTION));
+    Assert.assertEquals("timeout=1",
+      conn.getHeaderField(HttpHeaders.KEEP_ALIVE));
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+    header = new ShuffleHeader();
+    header.readFields(input);
+    input.close();
+  }
+
+  /**
+   * simulate a reducer that sends an invalid shuffle-header - sometimes a 
wrong
+   * header_name and sometimes a wrong version
+   * 
+   * @throws Exception exception
+   */
   @Test (timeout = 10000)
   public void testIncompatibleShuffleVersion() throws Exception {
     final int failureNum = 3;
@@ -224,7 +401,12 @@ public class TestShuffleHandler {
     shuffleHandler.stop();
     shuffleHandler.close();
   }
-  
+
+  /**
+   * Validate the limit on number of shuffle connections.
+   * 
+   * @throws Exception exception
+   */
   @Test (timeout = 10000)
   public void testMaxConnections() throws Exception {
     
@@ -237,13 +419,28 @@ public class TestShuffleHandler {
         // replace the shuffle handler with one stubbed for testing
         return new Shuffle(conf) {
           @Override
+          protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+              int reduce, String user) throws IOException {
+            // Do nothing.
+            return null;
+          }
+          @Override
+          protected void populateHeaders(List<String> mapIds, String jobId,
+              String user, int reduce, HttpRequest request,
+              HttpResponse response, boolean keepAliveParam,
+              Map<String, MapOutputInfo> infoMap) throws IOException {
+            // Do nothing.
+          }
+          @Override
           protected void verifyRequest(String appid, ChannelHandlerContext ctx,
               HttpRequest request, HttpResponse response, URL requestUri)
                   throws IOException {
+            // Do nothing.
           }
           @Override
           protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-              Channel ch, String user, String jobId, String mapId, int reduce)
+              Channel ch, String user, String mapId, int reduce,
+              MapOutputInfo info)
                   throws IOException {
             // send a shuffle header and a lot of data down the channel
             // to trigger a broken pipe
@@ -308,7 +505,13 @@ public class TestShuffleHandler {
     
     shuffleHandler.stop(); 
   }
-  
+
+  /**
+   * Validate the ownership of the map-output files being pulled in. The
+   * local-file-system owner of the file should match the user component in the
+   *
+   * @throws Exception exception
+   */
   @Test(timeout = 100000)
   public void testMapFileAccess() throws IOException {
     // This will run only in NativeIO is enabled as SecureIOUtils need it
@@ -323,7 +526,7 @@ public class TestShuffleHandler {
         TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
     conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
     ApplicationId appId = ApplicationId.newInstance(12345, 1);
-    System.out.println(appId.toString());
+    LOG.info(appId.toString());
     String appAttemptId = "attempt_12345_1_m_1_0";
     String user = "randomUser";
     String reducerId = "0";
@@ -341,6 +544,7 @@ public class TestShuffleHandler {
           protected void verifyRequest(String appid, ChannelHandlerContext ctx,
               HttpRequest request, HttpResponse response, URL requestUri)
               throws IOException {
+            // Do nothing.
           }
 
         };
@@ -393,7 +597,7 @@ public class TestShuffleHandler {
     }
   }
 
-  public static void createShuffleHandlerFiles(File logDir, String user,
+  private static void createShuffleHandlerFiles(File logDir, String user,
       String appId, String appAttemptId, Configuration conf,
       List<File> fileMap) throws IOException {
     String attemptDir =
@@ -412,8 +616,8 @@ public class TestShuffleHandler {
     createMapOutputFile(mapOutputFile, conf);
   }
 
-  public static void
-      createMapOutputFile(File mapOutputFile, Configuration conf)
+  private static void
+    createMapOutputFile(File mapOutputFile, Configuration conf)
           throws IOException {
     FileOutputStream out = new FileOutputStream(mapOutputFile);
     out.write("Creating new dummy map output file. Used only for testing"
@@ -422,7 +626,7 @@ public class TestShuffleHandler {
     out.close();
   }
 
-  public static void createIndexFile(File indexFile, Configuration conf)
+  private static void createIndexFile(File indexFile, Configuration conf)
       throws IOException {
     if (indexFile.exists()) {
       System.out.println("Deleting existing file");


Reply via email to