HDFS-11156. Add new op GETFILEBLOCKLOCATIONS to WebHDFS REST API. Contributed by Weiwei Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7fcc73fc Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7fcc73fc Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7fcc73fc Branch: refs/heads/YARN-3926 Commit: 7fcc73fc0d248aae1edbd4e1514c5818f6198928 Parents: b31e195 Author: Andrew Wang <w...@apache.org> Authored: Tue Jan 3 09:58:00 2017 -0800 Committer: Andrew Wang <w...@apache.org> Committed: Tue Jan 3 09:58:00 2017 -0800 ---------------------------------------------------------------------- .../apache/hadoop/hdfs/web/JsonUtilClient.java | 53 ++++ .../hadoop/hdfs/web/WebHdfsFileSystem.java | 62 ++++- .../hadoop/hdfs/web/resources/GetOpParam.java | 12 +- .../web/resources/NamenodeWebHdfsMethods.java | 16 ++ .../org/apache/hadoop/hdfs/web/JsonUtil.java | 33 +++ .../hadoop-hdfs/src/site/markdown/WebHDFS.md | 192 ++++++++++++- .../org/apache/hadoop/hdfs/web/TestWebHDFS.java | 276 +++++++++++++++++++ 7 files changed, 638 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java index 246f242..4204c54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary.Builder; import org.apache.hadoop.fs.FileChecksum; @@ -637,4 +638,56 @@ class JsonUtilClient { } } + static BlockLocation[] toBlockLocationArray(Map<?, ?> json) + throws IOException{ + final Map<?, ?> rootmap = + (Map<?, ?>)json.get(BlockLocation.class.getSimpleName() + "s"); + final List<?> array = JsonUtilClient.getList(rootmap, + BlockLocation.class.getSimpleName()); + + Preconditions.checkNotNull(array); + final BlockLocation[] locations = new BlockLocation[array.size()]; + int i = 0; + for (Object object : array) { + final Map<?, ?> m = (Map<?, ?>) object; + locations[i++] = JsonUtilClient.toBlockLocation(m); + } + return locations; + } + + /** Convert a Json map to BlockLocation. **/ + static BlockLocation toBlockLocation(Map<?, ?> m) + throws IOException{ + if(m == null) { + return null; + } + + long length = ((Number) m.get("length")).longValue(); + long offset = ((Number) m.get("offset")).longValue(); + boolean corrupt = Boolean. + getBoolean(m.get("corrupt").toString()); + String[] storageIds = toStringArray(getList(m, "storageIds")); + String[] cachedHosts = toStringArray(getList(m, "cachedHosts")); + String[] hosts = toStringArray(getList(m, "hosts")); + String[] names = toStringArray(getList(m, "names")); + String[] topologyPaths = toStringArray(getList(m, "topologyPaths")); + StorageType[] storageTypes = toStorageTypeArray( + getList(m, "storageTypes")); + return new BlockLocation(names, hosts, cachedHosts, + topologyPaths, storageIds, storageTypes, + offset, length, corrupt); + } + + static String[] toStringArray(List<?> list) { + if (list == null) { + return null; + } else { + final String[] array = new String[list.size()]; + int i = 0; + for (Object object : list) { + array[i++] = object.toString(); + } + return array; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 26cfc01..d4fa009 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -1611,14 +1611,68 @@ public class WebHdfsFileSystem extends FileSystem final long offset, final long length) throws IOException { statistics.incrementReadOps(1); storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS); + BlockLocation[] locations = null; + try { + locations = getFileBlockLocations( + GetOpParam.Op.GETFILEBLOCKLOCATIONS, + p, offset, length); + } catch (RemoteException e) { + // See the error message from ExceptionHandle + if(e.getMessage() != null && + e.getMessage().contains( + "Invalid value for webhdfs parameter") && + e.getMessage().contains( + GetOpParam.Op.GETFILEBLOCKLOCATIONS.toString())) { + // Old webhdfs server doesn't support GETFILEBLOCKLOCATIONS + // operation, fall back to query again using old API + // GET_BLOCK_LOCATIONS. + LOG.info("Invalid webhdfs operation parameter " + + GetOpParam.Op.GETFILEBLOCKLOCATIONS + ". Fallback to use " + + GetOpParam.Op.GET_BLOCK_LOCATIONS + " instead."); + locations = getFileBlockLocations( + GetOpParam.Op.GET_BLOCK_LOCATIONS, + p, offset, length); + } + } + return locations; + } - final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS; - return new FsPathResponseRunner<BlockLocation[]>(op, p, + /** + * Get file block locations implementation. Provide a operation + * parameter to determine how to get block locations from a webhdfs + * server. Older server only supports <b>GET_BLOCK_LOCATIONS</b> but + * not <b>GETFILEBLOCKLOCATIONS</b>. + * + * @param path path to the file + * @param offset start offset in the given file + * @param length of the file to get locations for + * @param operation + * Valid operation is either + * {@link org.apache.hadoop.hdfs.web.resources.GetOpParam.Op + * #GET_BLOCK_LOCATIONS} or + * {@link org.apache.hadoop.hdfs.web.resources.GetOpParam.Op + * #GET_BLOCK_LOCATIONS} + * @throws IOException + * Http connection error, decoding error or given + * operation is not valid + */ + @VisibleForTesting + protected BlockLocation[] getFileBlockLocations( + GetOpParam.Op operation, final Path path, + final long offset, final long length) throws IOException { + return new FsPathResponseRunner<BlockLocation[]>(operation, path, new OffsetParam(offset), new LengthParam(length)) { @Override BlockLocation[] decodeResponse(Map<?,?> json) throws IOException { - return DFSUtilClient.locatedBlocks2Locations( - JsonUtilClient.toLocatedBlocks(json)); + switch(operation) { + case GETFILEBLOCKLOCATIONS: + return JsonUtilClient.toBlockLocationArray(json); + case GET_BLOCK_LOCATIONS: + return DFSUtilClient.locatedBlocks2Locations( + JsonUtilClient.toLocatedBlocks(json)); + default : + throw new IOException("Unknown operation " + operation.name()); + } } }.run(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java index 9169ca8..1321bf6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java @@ -33,8 +33,18 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> { GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK), GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true), - /** GET_BLOCK_LOCATIONS is a private unstable op. */ + /** + * GET_BLOCK_LOCATIONS is a private/stable API op. It returns a + * {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks} + * json object. + */ GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK), + /** + * GETFILEBLOCKLOCATIONS is the public op that complies with + * {@link org.apache.hadoop.fs.FileSystem#getFileBlockLocations} + * interface. + */ + GETFILEBLOCKLOCATIONS(false, HttpURLConnection.HTTP_OK), GETACLSTATUS(false, HttpURLConnection.HTTP_OK), GETXATTRS(false, HttpURLConnection.HTTP_OK), GETTRASHROOT(false, HttpURLConnection.HTTP_OK), http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index e400847..139680c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -54,6 +54,7 @@ import javax.ws.rs.core.StreamingOutput; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -992,6 +993,21 @@ public class NamenodeWebHdfsMethods { return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); } } + case GETFILEBLOCKLOCATIONS: + { + final long offsetValue = offset.getValue(); + final Long lengthValue = length.getValue(); + + FileSystem fs = FileSystem.get(conf != null ? + conf : new Configuration()); + BlockLocation[] locations = fs.getFileBlockLocations( + new org.apache.hadoop.fs.Path(fullpath), + offsetValue, + lengthValue != null? lengthValue: Long.MAX_VALUE); + final String js = JsonUtil.toJsonString("BlockLocations", + JsonUtil.toJsonMap(locations)); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + } case GET_BLOCK_LOCATIONS: { final long offsetValue = offset.getValue(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java index 05a5777..0d89113 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java @@ -463,4 +463,37 @@ public class JsonUtil { public static String toJsonString(BlockStoragePolicy storagePolicy) { return toJsonString(BlockStoragePolicy.class, toJsonMap(storagePolicy)); } + + public static Map<String, Object> toJsonMap(BlockLocation[] locations) + throws IOException { + if(locations == null) { + return null; + } + final Map<String, Object> m = new TreeMap<String, Object>(); + Object[] blockLocations = new Object[locations.length]; + for(int i=0; i<locations.length; i++) { + blockLocations[i] = toJsonMap(locations[i]); + } + m.put(BlockLocation.class.getSimpleName(), blockLocations); + return m; + } + + public static Map<String, Object> toJsonMap( + final BlockLocation blockLocation) throws IOException { + if (blockLocation == null) { + return null; + } + + final Map<String, Object> m = new TreeMap<String, Object>(); + m.put("length", blockLocation.getLength()); + m.put("offset", blockLocation.getOffset()); + m.put("corrupt", blockLocation.isCorrupt()); + m.put("storageTypes", toJsonArray(blockLocation.getStorageTypes())); + m.put("storageIds", blockLocation.getStorageIds()); + m.put("cachedHosts", blockLocation.getCachedHosts()); + m.put("hosts", blockLocation.getHosts()); + m.put("names", blockLocation.getNames()); + m.put("topologyPaths", blockLocation.getTopologyPaths()); + return m; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md index f91e89f..27fd13a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md @@ -38,6 +38,7 @@ WebHDFS REST API * [Status of a File/Directory](#Status_of_a_FileDirectory) * [List a Directory](#List_a_Directory) * [Iteratively List a Directory](#Iteratively_List_a_Directory) + * [Get File Block Locations](#Get_File_Block_Locations) * [Other File System Operations](#Other_File_System_Operations) * [Get Content Summary of a Directory](#Get_Content_Summary_of_a_Directory) * [Get File Checksum](#Get_File_Checksum) @@ -97,6 +98,9 @@ WebHDFS REST API * [BlockStoragePolicy JSON Schema](#BlockStoragePolicy_JSON_Schema) * [BlockStoragePolicy Properties](#BlockStoragePolicy_Properties) * [BlockStoragePolicies JSON Schema](#BlockStoragePolicies_JSON_Schema) + * [BlockLocation JSON Schema](#BlockLocation_JSON_Schema) + * [BlockLocation Properties](#BlockLocation_Properties) + * [BlockLocations JSON Schema](#BlockLocations_JSON_Schema) * [HTTP Query Parameter Dictionary](#HTTP_Query_Parameter_Dictionary) * [ACL Spec](#ACL_Spec) * [XAttr Name](#XAttr_Name) @@ -167,6 +171,7 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop * [`CHECKACCESS`](#Check_access) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access) * [`GETALLSTORAGEPOLICY`](#Get_all_Storage_Policies) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAllStoragePolicies) * [`GETSTORAGEPOLICY`](#Get_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy) + * [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations) * HTTP PUT * [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create) * [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs) @@ -1141,7 +1146,7 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStor { "BlockStoragePolicy": { "copyOnCreateFile": false, - "creationFallbacks": [], + "creationFallbacks": [], "id":7, "name":"HOT", "replicationFallbacks":["ARCHIVE"], @@ -1151,6 +1156,51 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStor See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy +### Get File Block Locations + +* Submit a HTTP GET request. + + curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILEBLOCKLOCATIONS + + The client receives a response with a [`BlockLocations` JSON Object](#Block_Locations_JSON_Schema): + + HTTP/1.1 200 OK + Content-Type: application/json + Transfer-Encoding: chunked + + { + "BlockLocations" : + { + "BlockLocation": + [ + { + "cachedHosts" : [], + "corrupt" : false, + "hosts" : ["host"], + "length" : 134217728, // length of this block + "names" : ["host:ip"], + "offset" : 0, // offset of the block in the file + "storageIds" : ["storageid"], + "storageTypes" : ["DISK"], // enum {RAM_DISK, SSD, DISK, ARCHIVE} + "topologyPaths" : ["/default-rack/hostname:ip"] + }, { + "cachedHosts" : [], + "corrupt" : false, + "hosts" : ["host"], + "length" : 62599364, + "names" : ["host:ip"], + "offset" : 134217728, + "storageIds" : ["storageid"], + "storageTypes" : ["DISK"], + "topologyPaths" : ["/default-rack/hostname:ip"] + }, + ... + ] + } + } + +See also: [`offset`](#Offset), [`length`](#Length), [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations + Extended Attributes(XAttrs) Operations -------------------------------------- @@ -2109,6 +2159,146 @@ A `BlockStoragePolicies` JSON object represents an array of `BlockStoragePolicy` } ``` +#### BlockLocations JSON Schema + +A `BlockLocations` JSON object represents an array of `BlockLocation` JSON objects. + +```json +{ + "name" : "BlockLocations", + "properties": + { + "BlockLocations": + { + "type" : "object", + "properties": + { + "BlockLocation": + { + "description": "An array of BlockLocation", + "type" : "array", + "items" : blockLocationProperties //See BlockLocation Properties + } + } + } + } +} +``` + +See also [`BlockLocation` Properties](#BlockLocation_Properties), [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations), [BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html) + +### BlockLocation JSON Schema + +```json +{ + "name" : "BlockLocation", + "properties": + { + "BlockLocation": blockLocationProperties //See BlockLocation Properties + } +} +``` + +See also [`BlockLocation` Properties](#BlockLocation_Properties), [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations), [BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html) + +#### BlockLocation Properties + +JavaScript syntax is used to define `blockLocationProperties` so that it can be referred in both `BlockLocation` and `BlockLocations` JSON schemas. + +```javascript +var blockLocationProperties = +{ + "type" : "object", + "properties": + { + "cachedHosts": + { + "description": "Datanode hostnames with a cached replica", + "type" : "array", + "required" : "true", + "items" : + { + "description": "A datanode hostname", + "type" : "string" + } + }, + "corrupt": + { + "description": "True if the block is corrupted", + "type" : "boolean", + "required" : "true" + }, + "hosts": + { + "description": "Datanode hostnames store the block", + "type" : "array", + "required" : "true", + "items" : + { + "description": "A datanode hostname", + "type" : "string" + } + }, + "length": + { + "description": "Length of the block", + "type" : "integer", + "required" : "true" + }, + "names": + { + "description": "Datanode IP:xferPort for accessing the block", + "type" : "array", + "required" : "true", + "items" : + { + "description": "DatanodeIP:xferPort", + "type" : "string" + } + }, + "offset": + { + "description": "Offset of the block in the file", + "type" : "integer", + "required" : "true" + }, + "storageIds": + { + "description": "Storage ID of each replica", + "type" : "array", + "required" : "true", + "items" : + { + "description": "Storage ID", + "type" : "string" + } + }, + "storageTypes": + { + "description": "Storage type of each replica", + "type" : "array", + "required" : "true", + "items" : + { + "description": "Storage type", + "enum" : ["RAM_DISK", "SSD", "DISK", "ARCHIVE"] + } + }, + "topologyPaths": + { + "description": "Datanode addresses in network topology", + "type" : "array", + "required" : "true", + "items" : + { + "description": "/rack/host:ip", + "type" : "string" + } + } + } +}; +``` + HTTP Query Parameter Dictionary ------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fcc73fc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java index d4495dc..1ff04de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java @@ -29,6 +29,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.PrintWriter; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.SocketException; @@ -38,8 +39,16 @@ import java.net.URISyntaxException; import java.net.URL; import java.security.PrivilegedExceptionAction; import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.Random; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.MediaType; + import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,7 +75,11 @@ import org.apache.hadoop.hdfs.TestDFSClientRetries; import org.apache.hadoop.hdfs.TestFileCreation; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; @@ -77,6 +90,8 @@ import org.apache.hadoop.hdfs.web.resources.LengthParam; import org.apache.hadoop.hdfs.web.resources.NoRedirectParam; import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.Param; +import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.http.HttpServerFunctionalTest; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; @@ -93,8 +108,12 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.MapType; + import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; @@ -857,6 +876,76 @@ public class TestWebHDFS { Assert.assertTrue(storageTypes != null && storageTypes.length > 0 && storageTypes[0] == StorageType.DISK); } + + // Query webhdfs REST API to get block locations + InetSocketAddress addr = cluster.getNameNode().getHttpAddress(); + + // Case 1 + // URL without length or offset parameters + URL url1 = new URL("http", addr.getHostString(), addr.getPort(), + WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"); + LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url1); + + String response1 = getResponse(url1, "GET"); + LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response1); + // Parse BlockLocation array from json output using object mapper + BlockLocation[] locationArray1 = toBlockLocationArray(response1); + + // Verify the result from rest call is same as file system api + verifyEquals(locations, locationArray1); + + // Case 2 + // URL contains length and offset parameters + URL url2 = new URL("http", addr.getHostString(), addr.getPort(), + WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS" + + "&length=" + LENGTH + "&offset=" + OFFSET); + LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url2); + + String response2 = getResponse(url2, "GET"); + LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response2); + BlockLocation[] locationArray2 = toBlockLocationArray(response2); + + verifyEquals(locations, locationArray2); + + // Case 3 + // URL contains length parameter but without offset parameters + URL url3 = new URL("http", addr.getHostString(), addr.getPort(), + WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS" + + "&length=" + LENGTH); + LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url3); + + String response3 = getResponse(url3, "GET"); + LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response3); + BlockLocation[] locationArray3 = toBlockLocationArray(response3); + + verifyEquals(locations, locationArray3); + + // Case 4 + // URL contains offset parameter but without length parameter + URL url4 = new URL("http", addr.getHostString(), addr.getPort(), + WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS" + + "&offset=" + OFFSET); + LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url4); + + String response4 = getResponse(url4, "GET"); + LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response4); + BlockLocation[] locationArray4 = toBlockLocationArray(response4); + + verifyEquals(locations, locationArray4); + + // Case 5 + // URL specifies offset exceeds the file length + URL url5 = new URL("http", addr.getHostString(), addr.getPort(), + WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS" + + "&offset=1200"); + LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url5); + + String response5 = getResponse(url5, "GET"); + LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response5); + BlockLocation[] locationArray5 = toBlockLocationArray(response5); + + // Expected an empty array of BlockLocation + verifyEquals(new BlockLocation[] {}, locationArray5); } finally { if (cluster != null) { cluster.shutdown(); @@ -864,6 +953,66 @@ public class TestWebHDFS { } } + private BlockLocation[] toBlockLocationArray(String json) + throws IOException { + ObjectMapper mapper = new ObjectMapper(); + MapType subType = mapper.getTypeFactory().constructMapType( + Map.class, + String.class, + BlockLocation[].class); + MapType rootType = mapper.getTypeFactory().constructMapType( + Map.class, + mapper.constructType(String.class), + mapper.constructType(subType)); + + Map<String, Map<String, BlockLocation[]>> jsonMap = mapper + .readValue(json, rootType); + Map<String, BlockLocation[]> locationMap = jsonMap + .get("BlockLocations"); + BlockLocation[] locationArray = locationMap.get( + BlockLocation.class.getSimpleName()); + return locationArray; + } + + private void verifyEquals(BlockLocation[] locations1, + BlockLocation[] locations2) throws IOException { + for(int i=0; i<locations1.length; i++) { + BlockLocation location1 = locations1[i]; + BlockLocation location2 = locations2[i]; + Assert.assertEquals(location1.getLength(), + location2.getLength()); + Assert.assertEquals(location1.getOffset(), + location2.getOffset()); + Assert.assertArrayEquals(location1.getCachedHosts(), + location2.getCachedHosts()); + Assert.assertArrayEquals(location1.getHosts(), + location2.getHosts()); + Assert.assertArrayEquals(location1.getNames(), + location2.getNames()); + Assert.assertArrayEquals(location1.getStorageIds(), + location2.getStorageIds()); + Assert.assertArrayEquals(location1.getTopologyPaths(), + location2.getTopologyPaths()); + Assert.assertArrayEquals(location1.getStorageTypes(), + location2.getStorageTypes()); + } + } + + private static String getResponse(URL url, String httpRequestType) + throws IOException { + HttpURLConnection conn = null; + try { + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod(httpRequestType); + conn.setInstanceFollowRedirects(false); + return IOUtils.toString(conn.getInputStream()); + } finally { + if(conn != null) { + conn.disconnect(); + } + } + } + private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf, final URI uri, final String userName) throws Exception { @@ -1220,4 +1369,131 @@ public class TestWebHDFS { } } } + + /** + * A mock class to handle the {@link WebHdfsFileSystem} client + * request. The format of the response depends on how many of + * times it gets called (1 to 3 times). + * <p> + * First time call it return a wrapped json response with a + * IllegalArgumentException + * <p> + * Second time call it return a valid GET_BLOCK_LOCATIONS + * json response + * <p> + * Third time call it return a wrapped json response with + * a random IOException + * + */ + public static class MockWebHdfsServlet extends HttpServlet { + + private static final long serialVersionUID = 1L; + private static int respondTimes = 0; + private static final String RANDOM_EXCEPTION_MSG = + "This is a random exception"; + + @Override + public void doGet(HttpServletRequest request, + HttpServletResponse response) throws ServletException, IOException { + response.setHeader("Content-Type", + MediaType.APPLICATION_JSON); + String param = request.getParameter("op"); + if(respondTimes == 0) { + Exception mockException = new IllegalArgumentException( + "Invalid value for webhdfs parameter \"op\". " + + "" + "No enum constant " + param); + sendException(request, response, mockException); + } else if (respondTimes == 1) { + sendResponse(request, response); + } else if (respondTimes == 2) { + Exception mockException = new IOException(RANDOM_EXCEPTION_MSG); + sendException(request, response, mockException); + } + respondTimes++; + } + + private void sendResponse(HttpServletRequest request, + HttpServletResponse response) throws IOException { + response.setStatus(HttpServletResponse.SC_OK); + // Construct a LocatedBlock for testing + DatanodeInfo d = DFSTestUtil.getLocalDatanodeInfo(); + DatanodeInfo[] ds = new DatanodeInfo[1]; + ds[0] = d; + ExtendedBlock b1 = new ExtendedBlock("bpid", 1, 121, 1); + LocatedBlock l1 = new LocatedBlock(b1, ds); + l1.setStartOffset(0); + l1.setCorrupt(false); + List<LocatedBlock> ls = Arrays.asList(l1); + LocatedBlocks locatedblocks = + new LocatedBlocks(10, false, ls, l1, + true, null, null); + + try (PrintWriter pw = response.getWriter()) { + pw.write(JsonUtil.toJsonString(locatedblocks)); + } + } + + private void sendException(HttpServletRequest request, + HttpServletResponse response, + Exception mockException) throws IOException { + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + String errJs = JsonUtil.toJsonString(mockException); + try (PrintWriter pw = response.getWriter()) { + pw.write(errJs); + } + } + } + + @Test + public void testGetFileBlockLocationsBackwardsCompatibility() + throws Exception { + final Configuration conf = WebHdfsTestUtil.createConf(); + final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*"; + HttpServer2 http = null; + try { + http = HttpServerFunctionalTest.createTestServer(conf); + http.addServlet("test", pathSpec, MockWebHdfsServlet.class); + http.start(); + + // Write the address back to configuration so + // WebHdfsFileSystem could connect to the mock server + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, + "localhost:" + http.getConnectorAddress(0).getPort()); + + final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem( + conf, WebHdfsConstants.WEBHDFS_SCHEME); + + WebHdfsFileSystem spyFs = spy(webFS); + BlockLocation[] locations = spyFs + .getFileBlockLocations(new Path("p"), 0, 100); + + // Verify result + assertEquals(1, locations.length); + assertEquals(121, locations[0].getLength()); + + // Verify the fall back + // The function should be called exactly 2 times + // 1st time handles GETFILEBLOCKLOCATIONS and found it is not supported + // 2nd time fall back to handle GET_FILE_BLOCK_LOCATIONS + verify(spyFs, times(2)).getFileBlockLocations(any(), + any(), anyLong(), anyLong()); + + // Verify it doesn't erroneously fall back + // When server returns a different error, it should directly + // throw an exception. + try { + spyFs.getFileBlockLocations(new Path("p"), 0, 100); + } catch (Exception e) { + assertTrue(e instanceof IOException); + assertEquals(e.getMessage(), MockWebHdfsServlet.RANDOM_EXCEPTION_MSG); + // Totally this function has been called 3 times + verify(spyFs, times(3)).getFileBlockLocations(any(), + any(), anyLong(), anyLong()); + } + } finally { + if(http != null) { + http.stop(); + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org