HDFS-11156. Add new op GETFILEBLOCKLOCATIONS to WebHDFS REST API. Contributed 
by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7fcc73fc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7fcc73fc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7fcc73fc

Branch: refs/heads/YARN-3926
Commit: 7fcc73fc0d248aae1edbd4e1514c5818f6198928
Parents: b31e195
Author: Andrew Wang <w...@apache.org>
Authored: Tue Jan 3 09:58:00 2017 -0800
Committer: Andrew Wang <w...@apache.org>
Committed: Tue Jan 3 09:58:00 2017 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/web/JsonUtilClient.java  |  53 ++++
 .../hadoop/hdfs/web/WebHdfsFileSystem.java      |  62 ++++-
 .../hadoop/hdfs/web/resources/GetOpParam.java   |  12 +-
 .../web/resources/NamenodeWebHdfsMethods.java   |  16 ++
 .../org/apache/hadoop/hdfs/web/JsonUtil.java    |  33 +++
 .../hadoop-hdfs/src/site/markdown/WebHDFS.md    | 192 ++++++++++++-
 .../org/apache/hadoop/hdfs/web/TestWebHDFS.java | 276 +++++++++++++++++++
 7 files changed, 638 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
index 246f242..4204c54 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectReader;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary.Builder;
 import org.apache.hadoop.fs.FileChecksum;
@@ -637,4 +638,56 @@ class JsonUtilClient {
     }
   }
 
+  static BlockLocation[] toBlockLocationArray(Map<?, ?> json)
+      throws IOException{
+    final Map<?, ?> rootmap =
+        (Map<?, ?>)json.get(BlockLocation.class.getSimpleName() + "s");
+    final List<?> array = JsonUtilClient.getList(rootmap,
+        BlockLocation.class.getSimpleName());
+
+    Preconditions.checkNotNull(array);
+    final BlockLocation[] locations = new BlockLocation[array.size()];
+    int i = 0;
+    for (Object object : array) {
+      final Map<?, ?> m = (Map<?, ?>) object;
+      locations[i++] = JsonUtilClient.toBlockLocation(m);
+    }
+    return locations;
+  }
+
+  /** Convert a Json map to BlockLocation. **/
+  static BlockLocation toBlockLocation(Map<?, ?> m)
+      throws IOException{
+    if(m == null) {
+      return null;
+    }
+
+    long length = ((Number) m.get("length")).longValue();
+    long offset = ((Number) m.get("offset")).longValue();
+    boolean corrupt = Boolean.
+        getBoolean(m.get("corrupt").toString());
+    String[] storageIds = toStringArray(getList(m, "storageIds"));
+    String[] cachedHosts = toStringArray(getList(m, "cachedHosts"));
+    String[] hosts = toStringArray(getList(m, "hosts"));
+    String[] names = toStringArray(getList(m, "names"));
+    String[] topologyPaths = toStringArray(getList(m, "topologyPaths"));
+    StorageType[] storageTypes = toStorageTypeArray(
+        getList(m, "storageTypes"));
+    return new BlockLocation(names, hosts, cachedHosts,
+        topologyPaths, storageIds, storageTypes,
+        offset, length, corrupt);
+  }
+
+  static String[] toStringArray(List<?> list) {
+    if (list == null) {
+      return null;
+    } else {
+      final String[] array = new String[list.size()];
+      int i = 0;
+      for (Object object : list) {
+        array[i++] = object.toString();
+      }
+      return array;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index 26cfc01..d4fa009 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -1611,14 +1611,68 @@ public class WebHdfsFileSystem extends FileSystem
       final long offset, final long length) throws IOException {
     statistics.incrementReadOps(1);
     storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
+    BlockLocation[] locations = null;
+    try {
+      locations = getFileBlockLocations(
+          GetOpParam.Op.GETFILEBLOCKLOCATIONS,
+          p, offset, length);
+    } catch (RemoteException e) {
+      // See the error message from ExceptionHandle
+      if(e.getMessage() != null &&
+          e.getMessage().contains(
+              "Invalid value for webhdfs parameter") &&
+          e.getMessage().contains(
+              GetOpParam.Op.GETFILEBLOCKLOCATIONS.toString())) {
+        // Old webhdfs server doesn't support GETFILEBLOCKLOCATIONS
+        // operation, fall back to query again using old API
+        // GET_BLOCK_LOCATIONS.
+        LOG.info("Invalid webhdfs operation parameter "
+            + GetOpParam.Op.GETFILEBLOCKLOCATIONS + ". Fallback to use "
+            + GetOpParam.Op.GET_BLOCK_LOCATIONS + " instead.");
+        locations = getFileBlockLocations(
+            GetOpParam.Op.GET_BLOCK_LOCATIONS,
+            p, offset, length);
+      }
+    }
+    return locations;
+  }
 
-    final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
-    return new FsPathResponseRunner<BlockLocation[]>(op, p,
+  /**
+   * Get file block locations implementation. Provide a operation
+   * parameter to determine how to get block locations from a webhdfs
+   * server. Older server only supports <b>GET_BLOCK_LOCATIONS</b> but
+   * not <b>GETFILEBLOCKLOCATIONS</b>.
+   *
+   * @param path path to the file
+   * @param offset start offset in the given file
+   * @param length of the file to get locations for
+   * @param operation
+   *   Valid operation is either
+   *   {@link org.apache.hadoop.hdfs.web.resources.GetOpParam.Op
+   *   #GET_BLOCK_LOCATIONS} or
+   *   {@link org.apache.hadoop.hdfs.web.resources.GetOpParam.Op
+   *   #GET_BLOCK_LOCATIONS}
+   * @throws IOException
+   *   Http connection error, decoding error or given
+   *   operation is not valid
+   */
+  @VisibleForTesting
+  protected BlockLocation[] getFileBlockLocations(
+      GetOpParam.Op operation, final Path path,
+      final long offset, final long length) throws IOException {
+    return new FsPathResponseRunner<BlockLocation[]>(operation, path,
         new OffsetParam(offset), new LengthParam(length)) {
       @Override
       BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
-        return DFSUtilClient.locatedBlocks2Locations(
-            JsonUtilClient.toLocatedBlocks(json));
+        switch(operation) {
+        case GETFILEBLOCKLOCATIONS:
+          return JsonUtilClient.toBlockLocationArray(json);
+        case GET_BLOCK_LOCATIONS:
+          return DFSUtilClient.locatedBlocks2Locations(
+              JsonUtilClient.toLocatedBlocks(json));
+        default :
+          throw new IOException("Unknown operation " + operation.name());
+        }
       }
     }.run();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
index 9169ca8..1321bf6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
@@ -33,8 +33,18 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
     GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
     GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true),
 
-    /** GET_BLOCK_LOCATIONS is a private unstable op. */
+    /**
+     * GET_BLOCK_LOCATIONS is a private/stable API op. It returns a
+     * {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks}
+     * json object.
+     */
     GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
+    /**
+     * GETFILEBLOCKLOCATIONS is the public op that complies with
+     * {@link org.apache.hadoop.fs.FileSystem#getFileBlockLocations}
+     * interface.
+     */
+    GETFILEBLOCKLOCATIONS(false, HttpURLConnection.HTTP_OK),
     GETACLSTATUS(false, HttpURLConnection.HTTP_OK),
     GETXATTRS(false, HttpURLConnection.HTTP_OK),
     GETTRASHROOT(false, HttpURLConnection.HTTP_OK),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
index e400847..139680c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
@@ -54,6 +54,7 @@ import javax.ws.rs.core.StreamingOutput;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -992,6 +993,21 @@ public class NamenodeWebHdfsMethods {
         return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
       }
     }
+    case GETFILEBLOCKLOCATIONS:
+    {
+      final long offsetValue = offset.getValue();
+      final Long lengthValue = length.getValue();
+
+      FileSystem fs = FileSystem.get(conf != null ?
+          conf : new Configuration());
+      BlockLocation[] locations = fs.getFileBlockLocations(
+          new org.apache.hadoop.fs.Path(fullpath),
+          offsetValue,
+          lengthValue != null? lengthValue: Long.MAX_VALUE);
+      final String js = JsonUtil.toJsonString("BlockLocations",
+          JsonUtil.toJsonMap(locations));
+      return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+    }
     case GET_BLOCK_LOCATIONS:
     {
       final long offsetValue = offset.getValue();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
index 05a5777..0d89113 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
@@ -463,4 +463,37 @@ public class JsonUtil {
   public static String toJsonString(BlockStoragePolicy storagePolicy) {
     return toJsonString(BlockStoragePolicy.class, toJsonMap(storagePolicy));
   }
+
+  public static Map<String, Object> toJsonMap(BlockLocation[] locations)
+      throws IOException {
+    if(locations == null) {
+      return null;
+    }
+    final Map<String, Object> m = new TreeMap<String, Object>();
+    Object[] blockLocations = new Object[locations.length];
+    for(int i=0; i<locations.length; i++) {
+      blockLocations[i] = toJsonMap(locations[i]);
+    }
+    m.put(BlockLocation.class.getSimpleName(), blockLocations);
+    return m;
+  }
+
+  public static Map<String, Object> toJsonMap(
+      final BlockLocation blockLocation) throws IOException {
+    if (blockLocation == null) {
+      return null;
+    }
+
+    final Map<String, Object> m = new TreeMap<String, Object>();
+    m.put("length", blockLocation.getLength());
+    m.put("offset", blockLocation.getOffset());
+    m.put("corrupt", blockLocation.isCorrupt());
+    m.put("storageTypes", toJsonArray(blockLocation.getStorageTypes()));
+    m.put("storageIds", blockLocation.getStorageIds());
+    m.put("cachedHosts", blockLocation.getCachedHosts());
+    m.put("hosts", blockLocation.getHosts());
+    m.put("names", blockLocation.getNames());
+    m.put("topologyPaths", blockLocation.getTopologyPaths());
+    return m;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md 
b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
index f91e89f..27fd13a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
@@ -38,6 +38,7 @@ WebHDFS REST API
         * [Status of a File/Directory](#Status_of_a_FileDirectory)
         * [List a Directory](#List_a_Directory)
         * [Iteratively List a Directory](#Iteratively_List_a_Directory)
+        * [Get File Block Locations](#Get_File_Block_Locations)
     * [Other File System Operations](#Other_File_System_Operations)
         * [Get Content Summary of a 
Directory](#Get_Content_Summary_of_a_Directory)
         * [Get File Checksum](#Get_File_Checksum)
@@ -97,6 +98,9 @@ WebHDFS REST API
         * [BlockStoragePolicy JSON Schema](#BlockStoragePolicy_JSON_Schema)
             * [BlockStoragePolicy Properties](#BlockStoragePolicy_Properties)
         * [BlockStoragePolicies JSON Schema](#BlockStoragePolicies_JSON_Schema)
+        * [BlockLocation JSON Schema](#BlockLocation_JSON_Schema)
+            * [BlockLocation Properties](#BlockLocation_Properties)
+        * [BlockLocations JSON Schema](#BlockLocations_JSON_Schema)
     * [HTTP Query Parameter Dictionary](#HTTP_Query_Parameter_Dictionary)
         * [ACL Spec](#ACL_Spec)
         * [XAttr Name](#XAttr_Name)
@@ -167,6 +171,7 @@ The HTTP REST API supports the complete 
[FileSystem](../../api/org/apache/hadoop
     * [`CHECKACCESS`](#Check_access) (see 
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access)
     * [`GETALLSTORAGEPOLICY`](#Get_all_Storage_Policies) (see 
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAllStoragePolicies)
     * [`GETSTORAGEPOLICY`](#Get_Storage_Policy) (see 
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy)
+    * [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations) (see 
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations)
 *   HTTP PUT
     * [`CREATE`](#Create_and_Write_to_a_File) (see 
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create)
     * [`MKDIRS`](#Make_a_Directory) (see 
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs)
@@ -1141,7 +1146,7 @@ See also: 
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStor
         {
             "BlockStoragePolicy": {
                 "copyOnCreateFile": false,
-               "creationFallbacks": [],
+                "creationFallbacks": [],
                 "id":7,
                 "name":"HOT",
                 "replicationFallbacks":["ARCHIVE"],
@@ -1151,6 +1156,51 @@ See also: 
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStor
 
 See also: 
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy
 
+### Get File Block Locations
+
+* Submit a HTTP GET request.
+
+        curl -i 
"http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILEBLOCKLOCATIONS
+
+    The client receives a response with a [`BlockLocations` JSON 
Object](#Block_Locations_JSON_Schema):
+
+        HTTP/1.1 200 OK
+        Content-Type: application/json
+        Transfer-Encoding: chunked
+
+        {
+          "BlockLocations" :
+          {
+            "BlockLocation":
+            [
+              {
+                "cachedHosts" : [],
+                "corrupt" : false,
+                "hosts" : ["host"],
+                "length" : 134217728,                             // length of 
this block
+                "names" : ["host:ip"],
+                "offset" : 0,                                     // offset of 
the block in the file
+                "storageIds" : ["storageid"],
+                "storageTypes" : ["DISK"],                        // enum 
{RAM_DISK, SSD, DISK, ARCHIVE}
+                "topologyPaths" : ["/default-rack/hostname:ip"]
+              }, {
+                "cachedHosts" : [],
+                "corrupt" : false,
+                "hosts" : ["host"],
+                "length" : 62599364,
+                "names" : ["host:ip"],
+                "offset" : 134217728,
+                "storageIds" : ["storageid"],
+                "storageTypes" : ["DISK"],
+                "topologyPaths" : ["/default-rack/hostname:ip"]
+              },
+              ...
+            ]
+          }
+        }
+
+See also: [`offset`](#Offset), [`length`](#Length), 
[FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations
+
 Extended Attributes(XAttrs) Operations
 --------------------------------------
 
@@ -2109,6 +2159,146 @@ A `BlockStoragePolicies` JSON object represents an 
array of `BlockStoragePolicy`
 }
 ```
 
+#### BlockLocations JSON Schema
+
+A `BlockLocations` JSON object represents an array of `BlockLocation` JSON 
objects.
+
+```json
+{
+  "name"      : "BlockLocations",
+  "properties":
+  {
+    "BlockLocations":
+    {
+      "type"      : "object",
+      "properties":
+      {
+        "BlockLocation":
+        {
+          "description": "An array of BlockLocation",
+          "type"       : "array",
+          "items"      : blockLocationProperties      //See BlockLocation 
Properties
+        }
+      }
+    }
+  }
+}
+```
+
+See also [`BlockLocation` Properties](#BlockLocation_Properties), 
[`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations), 
[BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html)
+
+### BlockLocation JSON Schema
+
+```json
+{
+  "name"      : "BlockLocation",
+  "properties":
+  {
+    "BlockLocation": blockLocationProperties      //See BlockLocation 
Properties
+  }
+}
+```
+
+See also [`BlockLocation` Properties](#BlockLocation_Properties), 
[`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations), 
[BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html)
+
+#### BlockLocation Properties
+
+JavaScript syntax is used to define `blockLocationProperties` so that it can 
be referred in both `BlockLocation` and `BlockLocations` JSON schemas.
+
+```javascript
+var blockLocationProperties =
+{
+  "type"      : "object",
+  "properties":
+  {
+    "cachedHosts":
+    {
+      "description": "Datanode hostnames with a cached replica",
+      "type"       : "array",
+      "required"   : "true",
+      "items"      :
+      {
+        "description": "A datanode hostname",
+        "type"       : "string"
+      }
+    },
+    "corrupt":
+    {
+      "description": "True if the block is corrupted",
+      "type"       : "boolean",
+      "required"   : "true"
+    },
+    "hosts":
+    {
+      "description": "Datanode hostnames store the block",
+      "type"       : "array",
+      "required"   : "true",
+      "items"      :
+      {
+        "description": "A datanode hostname",
+        "type"       : "string"
+      }
+    },
+    "length":
+    {
+      "description": "Length of the block",
+      "type"       : "integer",
+      "required"   : "true"
+    },
+    "names":
+    {
+      "description": "Datanode IP:xferPort for accessing the block",
+      "type"       : "array",
+      "required"   : "true",
+      "items"      :
+      {
+        "description": "DatanodeIP:xferPort",
+        "type"       : "string"
+      }
+    },
+    "offset":
+    {
+      "description": "Offset of the block in the file",
+      "type"       : "integer",
+      "required"   : "true"
+    },
+    "storageIds":
+    {
+      "description": "Storage ID of each replica",
+      "type"       : "array",
+      "required"   : "true",
+      "items"      :
+      {
+        "description": "Storage ID",
+        "type"       : "string"
+      }
+    },
+    "storageTypes":
+    {
+      "description": "Storage type of each replica",
+      "type"       : "array",
+      "required"   : "true",
+      "items"      :
+      {
+        "description": "Storage type",
+        "enum"       : ["RAM_DISK", "SSD", "DISK", "ARCHIVE"]
+      }
+    },
+    "topologyPaths":
+    {
+      "description": "Datanode addresses in network topology",
+      "type"       : "array",
+      "required"   : "true",
+      "items"      :
+      {
+        "description": "/rack/host:ip",
+        "type"       : "string"
+      }
+    }
+  }
+};
+```
+
 HTTP Query Parameter Dictionary
 -------------------------------
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
index d4495dc..1ff04de 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
@@ -29,6 +29,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.PrintWriter;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.SocketException;
@@ -38,8 +39,16 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -66,7 +75,11 @@ import org.apache.hadoop.hdfs.TestDFSClientRetries;
 import org.apache.hadoop.hdfs.TestFileCreation;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@@ -77,6 +90,8 @@ import org.apache.hadoop.hdfs.web.resources.LengthParam;
 import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
 import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 import org.apache.hadoop.hdfs.web.resources.Param;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.http.HttpServerFunctionalTest;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
@@ -93,8 +108,12 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
@@ -857,6 +876,76 @@ public class TestWebHDFS {
         Assert.assertTrue(storageTypes != null && storageTypes.length > 0 &&
             storageTypes[0] == StorageType.DISK);
       }
+
+      // Query webhdfs REST API to get block locations
+      InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
+
+      // Case 1
+      // URL without length or offset parameters
+      URL url1 = new URL("http", addr.getHostString(), addr.getPort(),
+          WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS");
+      LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url1);
+
+      String response1 = getResponse(url1, "GET");
+      LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response1);
+      // Parse BlockLocation array from json output using object mapper
+      BlockLocation[] locationArray1 = toBlockLocationArray(response1);
+
+      // Verify the result from rest call is same as file system api
+      verifyEquals(locations, locationArray1);
+
+      // Case 2
+      // URL contains length and offset parameters
+      URL url2 = new URL("http", addr.getHostString(), addr.getPort(),
+          WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+              + "&length=" + LENGTH + "&offset=" + OFFSET);
+      LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url2);
+
+      String response2 = getResponse(url2, "GET");
+      LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response2);
+      BlockLocation[] locationArray2 = toBlockLocationArray(response2);
+
+      verifyEquals(locations, locationArray2);
+
+      // Case 3
+      // URL contains length parameter but without offset parameters
+      URL url3 = new URL("http", addr.getHostString(), addr.getPort(),
+          WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+              + "&length=" + LENGTH);
+      LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url3);
+
+      String response3 = getResponse(url3, "GET");
+      LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response3);
+      BlockLocation[] locationArray3 = toBlockLocationArray(response3);
+
+      verifyEquals(locations, locationArray3);
+
+      // Case 4
+      // URL contains offset parameter but without length parameter
+      URL url4 = new URL("http", addr.getHostString(), addr.getPort(),
+          WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+              + "&offset=" + OFFSET);
+      LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url4);
+
+      String response4 = getResponse(url4, "GET");
+      LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response4);
+      BlockLocation[] locationArray4 = toBlockLocationArray(response4);
+
+      verifyEquals(locations, locationArray4);
+
+      // Case 5
+      // URL specifies offset exceeds the file length
+      URL url5 = new URL("http", addr.getHostString(), addr.getPort(),
+          WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+              + "&offset=1200");
+      LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url5);
+
+      String response5 = getResponse(url5, "GET");
+      LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response5);
+      BlockLocation[] locationArray5 = toBlockLocationArray(response5);
+
+      // Expected an empty array of BlockLocation
+      verifyEquals(new BlockLocation[] {}, locationArray5);
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -864,6 +953,66 @@ public class TestWebHDFS {
     }
   }
 
+  private BlockLocation[] toBlockLocationArray(String json)
+      throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    MapType subType = mapper.getTypeFactory().constructMapType(
+        Map.class,
+        String.class,
+        BlockLocation[].class);
+    MapType rootType = mapper.getTypeFactory().constructMapType(
+        Map.class,
+        mapper.constructType(String.class),
+        mapper.constructType(subType));
+
+    Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
+        .readValue(json, rootType);
+    Map<String, BlockLocation[]> locationMap = jsonMap
+        .get("BlockLocations");
+    BlockLocation[] locationArray = locationMap.get(
+        BlockLocation.class.getSimpleName());
+    return locationArray;
+  }
+
+  private void verifyEquals(BlockLocation[] locations1,
+      BlockLocation[] locations2) throws IOException {
+    for(int i=0; i<locations1.length; i++) {
+      BlockLocation location1 = locations1[i];
+      BlockLocation location2 = locations2[i];
+      Assert.assertEquals(location1.getLength(),
+          location2.getLength());
+      Assert.assertEquals(location1.getOffset(),
+          location2.getOffset());
+      Assert.assertArrayEquals(location1.getCachedHosts(),
+          location2.getCachedHosts());
+      Assert.assertArrayEquals(location1.getHosts(),
+          location2.getHosts());
+      Assert.assertArrayEquals(location1.getNames(),
+          location2.getNames());
+      Assert.assertArrayEquals(location1.getStorageIds(),
+          location2.getStorageIds());
+      Assert.assertArrayEquals(location1.getTopologyPaths(),
+          location2.getTopologyPaths());
+      Assert.assertArrayEquals(location1.getStorageTypes(),
+          location2.getStorageTypes());
+    }
+  }
+
+  private static String getResponse(URL url, String httpRequestType)
+      throws IOException {
+    HttpURLConnection conn = null;
+    try {
+      conn = (HttpURLConnection) url.openConnection();
+      conn.setRequestMethod(httpRequestType);
+      conn.setInstanceFollowRedirects(false);
+      return IOUtils.toString(conn.getInputStream());
+    } finally {
+      if(conn != null) {
+        conn.disconnect();
+      }
+    }
+  }
+
   private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf,
       final URI uri, final String userName) throws Exception {
 
@@ -1220,4 +1369,131 @@ public class TestWebHDFS {
       }
     }
   }
+
+  /**
+   * A mock class to handle the {@link WebHdfsFileSystem} client
+   * request. The format of the response depends on how many of
+   * times it gets called (1 to 3 times).
+   * <p>
+   * First time call it return a wrapped json response with a
+   * IllegalArgumentException
+   * <p>
+   * Second time call it return a valid GET_BLOCK_LOCATIONS
+   * json response
+   * <p>
+   * Third time call it return a wrapped json response with
+   * a random IOException
+   *
+   */
+  public static class MockWebHdfsServlet extends HttpServlet {
+
+    private static final long serialVersionUID = 1L;
+    private static int respondTimes = 0;
+    private static final String RANDOM_EXCEPTION_MSG =
+        "This is a random exception";
+
+    @Override
+    public void doGet(HttpServletRequest request,
+        HttpServletResponse response) throws ServletException, IOException {
+      response.setHeader("Content-Type",
+          MediaType.APPLICATION_JSON);
+      String param = request.getParameter("op");
+      if(respondTimes == 0) {
+        Exception mockException = new IllegalArgumentException(
+            "Invalid value for webhdfs parameter \"op\". "
+                + "" + "No enum constant " + param);
+        sendException(request, response, mockException);
+      } else if (respondTimes == 1) {
+        sendResponse(request, response);
+      } else if (respondTimes == 2) {
+        Exception mockException = new IOException(RANDOM_EXCEPTION_MSG);
+        sendException(request, response, mockException);
+      }
+      respondTimes++;
+    }
+
+    private void sendResponse(HttpServletRequest request,
+        HttpServletResponse response) throws IOException {
+      response.setStatus(HttpServletResponse.SC_OK);
+      // Construct a LocatedBlock for testing
+      DatanodeInfo d = DFSTestUtil.getLocalDatanodeInfo();
+      DatanodeInfo[] ds = new DatanodeInfo[1];
+      ds[0] = d;
+      ExtendedBlock b1 = new ExtendedBlock("bpid", 1, 121, 1);
+      LocatedBlock l1 = new LocatedBlock(b1, ds);
+      l1.setStartOffset(0);
+      l1.setCorrupt(false);
+      List<LocatedBlock> ls = Arrays.asList(l1);
+      LocatedBlocks locatedblocks =
+          new LocatedBlocks(10, false, ls, l1,
+              true, null, null);
+
+      try (PrintWriter pw = response.getWriter()) {
+        pw.write(JsonUtil.toJsonString(locatedblocks));
+      }
+    }
+
+    private void sendException(HttpServletRequest request,
+        HttpServletResponse response,
+        Exception mockException) throws IOException {
+      response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+      String errJs = JsonUtil.toJsonString(mockException);
+      try (PrintWriter pw = response.getWriter()) {
+        pw.write(errJs);
+      }
+    }
+  }
+
+  @Test
+  public void testGetFileBlockLocationsBackwardsCompatibility()
+      throws Exception {
+    final Configuration conf = WebHdfsTestUtil.createConf();
+    final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
+    HttpServer2 http = null;
+    try {
+      http = HttpServerFunctionalTest.createTestServer(conf);
+      http.addServlet("test", pathSpec, MockWebHdfsServlet.class);
+      http.start();
+
+      // Write the address back to configuration so
+      // WebHdfsFileSystem could connect to the mock server
+      conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
+          "localhost:" + http.getConnectorAddress(0).getPort());
+
+      final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(
+          conf, WebHdfsConstants.WEBHDFS_SCHEME);
+
+      WebHdfsFileSystem spyFs = spy(webFS);
+      BlockLocation[] locations = spyFs
+          .getFileBlockLocations(new Path("p"), 0, 100);
+
+      // Verify result
+      assertEquals(1, locations.length);
+      assertEquals(121, locations[0].getLength());
+
+      // Verify the fall back
+      // The function should be called exactly 2 times
+      // 1st time handles GETFILEBLOCKLOCATIONS and found it is not supported
+      // 2nd time fall back to handle GET_FILE_BLOCK_LOCATIONS
+      verify(spyFs, times(2)).getFileBlockLocations(any(),
+          any(), anyLong(), anyLong());
+
+      // Verify it doesn't erroneously fall back
+      // When server returns a different error, it should directly
+      // throw an exception.
+      try {
+        spyFs.getFileBlockLocations(new Path("p"), 0, 100);
+      } catch (Exception e) {
+        assertTrue(e instanceof IOException);
+        assertEquals(e.getMessage(), MockWebHdfsServlet.RANDOM_EXCEPTION_MSG);
+        // Totally this function has been called 3 times
+        verify(spyFs, times(3)).getFileBlockLocations(any(),
+            any(), anyLong(), anyLong());
+      }
+    } finally {
+      if(http != null) {
+        http.stop();
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to