Author: szetszwo Date: Sat Aug 16 21:02:21 2014 New Revision: 1618417 URL: http://svn.apache.org/r1618417 Log: Merge r1609845 through r1618416 from trunk.
Removed: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/ProxyUser.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/security/ProxyUserService.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/wsrs/UserProvider.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/TestProxyUserService.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/wsrs/TestUserProvider.java Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/ (props changed) hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSAuthenticationFilter.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServerWebApp.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/resources/httpfs-default.xml hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed) hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1617377-1618416 Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSAuthenticationFilter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSAuthenticationFilter.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSAuthenticationFilter.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSAuthenticationFilter.java Sat Aug 16 21:02:21 2014 @@ -91,4 +91,14 @@ public class HttpFSAuthenticationFilter return props; } + protected Configuration getProxyuserConfiguration(FilterConfig filterConfig) { + Map<String, String> proxyuserConf = HttpFSServerWebApp.get().getConfig(). + getValByRegex("httpfs\\.proxyuser\\."); + Configuration conf = new Configuration(false); + for (Map.Entry<String, String> entry : proxyuserConf.entrySet()) { + conf.set(entry.getKey().substring("httpfs.".length()), entry.getValue()); + } + return conf; + } + } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java Sat Aug 16 21:02:21 2014 @@ -30,8 +30,6 @@ import org.apache.hadoop.lib.wsrs.Param; import org.apache.hadoop.lib.wsrs.ParametersProvider; import org.apache.hadoop.lib.wsrs.ShortParam; import org.apache.hadoop.lib.wsrs.StringParam; -import org.apache.hadoop.lib.wsrs.UserProvider; -import org.slf4j.MDC; import javax.ws.rs.ext.Provider; import java.util.HashMap; @@ -53,57 +51,44 @@ public class HttpFSParametersProvider ex static { PARAMS_DEF.put(Operation.OPEN, - new Class[]{DoAsParam.class, OffsetParam.class, LenParam.class}); - PARAMS_DEF.put(Operation.GETFILESTATUS, new Class[]{DoAsParam.class}); - PARAMS_DEF.put(Operation.LISTSTATUS, - new Class[]{DoAsParam.class, FilterParam.class}); - PARAMS_DEF.put(Operation.GETHOMEDIRECTORY, new Class[]{DoAsParam.class}); - PARAMS_DEF.put(Operation.GETCONTENTSUMMARY, new Class[]{DoAsParam.class}); - PARAMS_DEF.put(Operation.GETFILECHECKSUM, new Class[]{DoAsParam.class}); - PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS, - new Class[]{DoAsParam.class}); - PARAMS_DEF.put(Operation.GETACLSTATUS, new Class[]{DoAsParam.class}); - PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{DoAsParam.class}); - PARAMS_DEF.put(Operation.APPEND, - new Class[]{DoAsParam.class, DataParam.class}); + new Class[]{OffsetParam.class, LenParam.class}); + PARAMS_DEF.put(Operation.GETFILESTATUS, new Class[]{}); + PARAMS_DEF.put(Operation.LISTSTATUS, new Class[]{FilterParam.class}); + PARAMS_DEF.put(Operation.GETHOMEDIRECTORY, new Class[]{}); + PARAMS_DEF.put(Operation.GETCONTENTSUMMARY, new Class[]{}); + PARAMS_DEF.put(Operation.GETFILECHECKSUM, new Class[]{}); + PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS, new Class[]{}); + PARAMS_DEF.put(Operation.GETACLSTATUS, new Class[]{}); + PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{}); + PARAMS_DEF.put(Operation.APPEND, new Class[]{DataParam.class}); PARAMS_DEF.put(Operation.CONCAT, new Class[]{SourcesParam.class}); PARAMS_DEF.put(Operation.CREATE, - new Class[]{DoAsParam.class, PermissionParam.class, OverwriteParam.class, + new Class[]{PermissionParam.class, OverwriteParam.class, ReplicationParam.class, BlockSizeParam.class, DataParam.class}); - PARAMS_DEF.put(Operation.MKDIRS, - new Class[]{DoAsParam.class, PermissionParam.class}); - PARAMS_DEF.put(Operation.RENAME, - new Class[]{DoAsParam.class, DestinationParam.class}); + PARAMS_DEF.put(Operation.MKDIRS, new Class[]{PermissionParam.class}); + PARAMS_DEF.put(Operation.RENAME, new Class[]{DestinationParam.class}); PARAMS_DEF.put(Operation.SETOWNER, - new Class[]{DoAsParam.class, OwnerParam.class, GroupParam.class}); - PARAMS_DEF.put(Operation.SETPERMISSION, - new Class[]{DoAsParam.class, PermissionParam.class}); + new Class[]{OwnerParam.class, GroupParam.class}); + PARAMS_DEF.put(Operation.SETPERMISSION, new Class[]{PermissionParam.class}); PARAMS_DEF.put(Operation.SETREPLICATION, - new Class[]{DoAsParam.class, ReplicationParam.class}); + new Class[]{ReplicationParam.class}); PARAMS_DEF.put(Operation.SETTIMES, - new Class[]{DoAsParam.class, ModifiedTimeParam.class, - AccessTimeParam.class}); - PARAMS_DEF.put(Operation.DELETE, - new Class[]{DoAsParam.class, RecursiveParam.class}); - PARAMS_DEF.put(Operation.SETACL, - new Class[]{DoAsParam.class, AclPermissionParam.class}); - PARAMS_DEF.put(Operation.REMOVEACL, - new Class[]{DoAsParam.class}); + new Class[]{ModifiedTimeParam.class, AccessTimeParam.class}); + PARAMS_DEF.put(Operation.DELETE, new Class[]{RecursiveParam.class}); + PARAMS_DEF.put(Operation.SETACL, new Class[]{AclPermissionParam.class}); + PARAMS_DEF.put(Operation.REMOVEACL, new Class[]{}); PARAMS_DEF.put(Operation.MODIFYACLENTRIES, - new Class[]{DoAsParam.class, AclPermissionParam.class}); + new Class[]{AclPermissionParam.class}); PARAMS_DEF.put(Operation.REMOVEACLENTRIES, - new Class[]{DoAsParam.class, AclPermissionParam.class}); - PARAMS_DEF.put(Operation.REMOVEDEFAULTACL, - new Class[]{DoAsParam.class}); + new Class[]{AclPermissionParam.class}); + PARAMS_DEF.put(Operation.REMOVEDEFAULTACL, new Class[]{}); PARAMS_DEF.put(Operation.SETXATTR, - new Class[]{DoAsParam.class, XAttrNameParam.class, XAttrValueParam.class, + new Class[]{XAttrNameParam.class, XAttrValueParam.class, XAttrSetFlagParam.class}); - PARAMS_DEF.put(Operation.REMOVEXATTR, - new Class[]{DoAsParam.class, XAttrNameParam.class}); + PARAMS_DEF.put(Operation.REMOVEXATTR, new Class[]{XAttrNameParam.class}); PARAMS_DEF.put(Operation.GETXATTRS, - new Class[]{DoAsParam.class, XAttrNameParam.class, XAttrEncodingParam.class}); - PARAMS_DEF.put(Operation.LISTXATTRS, - new Class[]{DoAsParam.class}); + new Class[]{XAttrNameParam.class, XAttrEncodingParam.class}); + PARAMS_DEF.put(Operation.LISTXATTRS, new Class[]{}); } public HttpFSParametersProvider() { @@ -206,41 +191,6 @@ public class HttpFSParametersProvider ex } /** - * Class for do-as parameter. - */ - @InterfaceAudience.Private - public static class DoAsParam extends StringParam { - - /** - * Parameter name. - */ - public static final String NAME = HttpFSFileSystem.DO_AS_PARAM; - - /** - * Constructor. - */ - public DoAsParam() { - super(NAME, null, UserProvider.getUserPattern()); - } - - /** - * Delegates to parent and then adds do-as user to - * MDC context for logging purposes. - * - * - * @param str parameter value. - * - * @return parsed parameter - */ - @Override - public String parseParam(String str) { - String doAs = super.parseParam(str); - MDC.put(getName(), (doAs != null) ? doAs : "-"); - return doAs; - } - } - - /** * Class for filter parameter. */ @InterfaceAudience.Private @@ -275,7 +225,7 @@ public class HttpFSParametersProvider ex * Constructor. */ public GroupParam() { - super(NAME, null, UserProvider.getUserPattern()); + super(NAME, null); } } @@ -371,7 +321,7 @@ public class HttpFSParametersProvider ex * Constructor. */ public OwnerParam() { - super(NAME, null, UserProvider.getUserPattern()); + super(NAME, null); } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java Sat Aug 16 21:02:21 2014 @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.http.server. import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DataParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DestinationParam; -import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DoAsParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.FilterParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.GroupParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.LenParam; @@ -50,12 +49,11 @@ import org.apache.hadoop.lib.service.Fil import org.apache.hadoop.lib.service.FileSystemAccessException; import org.apache.hadoop.lib.service.Groups; import org.apache.hadoop.lib.service.Instrumentation; -import org.apache.hadoop.lib.service.ProxyUser; import org.apache.hadoop.lib.servlet.FileSystemReleaseFilter; -import org.apache.hadoop.lib.servlet.HostnameFilter; import org.apache.hadoop.lib.wsrs.InputStreamEntity; import org.apache.hadoop.lib.wsrs.Parameters; -import org.apache.hadoop.security.authentication.server.AuthenticationToken; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.web.HttpUserGroupInformation; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +77,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.security.AccessControlException; -import java.security.Principal; import java.text.MessageFormat; import java.util.EnumSet; import java.util.List; @@ -97,48 +94,10 @@ public class HttpFSServer { private static Logger AUDIT_LOG = LoggerFactory.getLogger("httpfsaudit"); /** - * Resolves the effective user that will be used to request a FileSystemAccess filesystem. - * <p/> - * If the doAs-user is NULL or the same as the user, it returns the user. - * <p/> - * Otherwise it uses proxyuser rules (see {@link ProxyUser} to determine if the - * current user can impersonate the doAs-user. - * <p/> - * If the current user cannot impersonate the doAs-user an - * <code>AccessControlException</code> will be thrown. - * - * @param user principal for whom the filesystem instance is. - * @param doAs do-as user, if any. - * - * @return the effective user. - * - * @throws IOException thrown if an IO error occurrs. - * @throws AccessControlException thrown if the current user cannot impersonate - * the doAs-user. - */ - private String getEffectiveUser(Principal user, String doAs) throws IOException { - String effectiveUser = user.getName(); - if (doAs != null && !doAs.equals(user.getName())) { - ProxyUser proxyUser = HttpFSServerWebApp.get().get(ProxyUser.class); - String proxyUserName; - if (user instanceof AuthenticationToken) { - proxyUserName = ((AuthenticationToken)user).getUserName(); - } else { - proxyUserName = user.getName(); - } - proxyUser.validate(proxyUserName, HostnameFilter.get(), doAs); - effectiveUser = doAs; - AUDIT_LOG.info("Proxy user [{}] DoAs user [{}]", proxyUserName, doAs); - } - return effectiveUser; - } - - /** * Executes a {@link FileSystemAccess.FileSystemExecutor} using a filesystem for the effective * user. * - * @param user principal making the request. - * @param doAs do-as user, if any. + * @param ugi user making the request. * @param executor FileSystemExecutor to execute. * * @return FileSystemExecutor response @@ -147,12 +106,11 @@ public class HttpFSServer { * @throws FileSystemAccessException thrown if a FileSystemAccess releated error occurred. Thrown * exceptions are handled by {@link HttpFSExceptionProvider}. */ - private <T> T fsExecute(Principal user, String doAs, FileSystemAccess.FileSystemExecutor<T> executor) + private <T> T fsExecute(UserGroupInformation ugi, FileSystemAccess.FileSystemExecutor<T> executor) throws IOException, FileSystemAccessException { - String hadoopUser = getEffectiveUser(user, doAs); FileSystemAccess fsAccess = HttpFSServerWebApp.get().get(FileSystemAccess.class); Configuration conf = HttpFSServerWebApp.get().get(FileSystemAccess.class).getFileSystemConfiguration(); - return fsAccess.execute(hadoopUser, conf, executor); + return fsAccess.execute(ugi.getShortUserName(), conf, executor); } /** @@ -162,8 +120,7 @@ public class HttpFSServer { * If a do-as user is specified, the current user must be a valid proxyuser, otherwise an * <code>AccessControlException</code> will be thrown. * - * @param user principal for whom the filesystem instance is. - * @param doAs do-as user, if any. + * @param ugi principal for whom the filesystem instance is. * * @return a filesystem for the specified user or do-as user. * @@ -172,8 +129,9 @@ public class HttpFSServer { * @throws FileSystemAccessException thrown if a FileSystemAccess releated error occurred. Thrown * exceptions are handled by {@link HttpFSExceptionProvider}. */ - private FileSystem createFileSystem(Principal user, String doAs) throws IOException, FileSystemAccessException { - String hadoopUser = getEffectiveUser(user, doAs); + private FileSystem createFileSystem(UserGroupInformation ugi) + throws IOException, FileSystemAccessException { + String hadoopUser = ugi.getShortUserName(); FileSystemAccess fsAccess = HttpFSServerWebApp.get().get(FileSystemAccess.class); Configuration conf = HttpFSServerWebApp.get().get(FileSystemAccess.class).getFileSystemConfiguration(); FileSystem fs = fsAccess.createFileSystem(hadoopUser, conf); @@ -192,7 +150,6 @@ public class HttpFSServer { /** * Special binding for '/' as it is not handled by the wildcard binding. * - * @param user the principal of the user making the request. * @param op the HttpFS operation of the request. * @param params the HttpFS parameters of the request. * @@ -206,11 +163,10 @@ public class HttpFSServer { */ @GET @Produces(MediaType.APPLICATION_JSON) - public Response getRoot(@Context Principal user, - @QueryParam(OperationParam.NAME) OperationParam op, + public Response getRoot(@QueryParam(OperationParam.NAME) OperationParam op, @Context Parameters params) throws IOException, FileSystemAccessException { - return get(user, "", op, params); + return get("", op, params); } private String makeAbsolute(String path) { @@ -220,7 +176,6 @@ public class HttpFSServer { /** * Binding to handle GET requests, supported operations are * - * @param user the principal of the user making the request. * @param path the path for operation. * @param op the HttpFS operation of the request. * @param params the HttpFS parameters of the request. @@ -236,21 +191,20 @@ public class HttpFSServer { @GET @Path("{path:.*}") @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON}) - public Response get(@Context Principal user, - @PathParam("path") String path, + public Response get(@PathParam("path") String path, @QueryParam(OperationParam.NAME) OperationParam op, @Context Parameters params) throws IOException, FileSystemAccessException { + UserGroupInformation user = HttpUserGroupInformation.get(); Response response; path = makeAbsolute(path); MDC.put(HttpFSFileSystem.OP_PARAM, op.value().name()); - String doAs = params.get(DoAsParam.NAME, DoAsParam.class); switch (op.value()) { case OPEN: { //Invoking the command directly using an unmanaged FileSystem that is // released by the FileSystemReleaseFilter FSOperations.FSOpen command = new FSOperations.FSOpen(path); - FileSystem fs = createFileSystem(user, doAs); + FileSystem fs = createFileSystem(user); InputStream is = command.execute(fs); Long offset = params.get(OffsetParam.NAME, OffsetParam.class); Long len = params.get(LenParam.NAME, LenParam.class); @@ -264,7 +218,7 @@ public class HttpFSServer { case GETFILESTATUS: { FSOperations.FSFileStatus command = new FSOperations.FSFileStatus(path); - Map json = fsExecute(user, doAs, command); + Map json = fsExecute(user, command); AUDIT_LOG.info("[{}]", path); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; @@ -273,7 +227,7 @@ public class HttpFSServer { String filter = params.get(FilterParam.NAME, FilterParam.class); FSOperations.FSListStatus command = new FSOperations.FSListStatus( path, filter); - Map json = fsExecute(user, doAs, command); + Map json = fsExecute(user, command); AUDIT_LOG.info("[{}] filter [{}]", path, (filter != null) ? filter : "-"); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); @@ -282,7 +236,7 @@ public class HttpFSServer { case GETHOMEDIRECTORY: { enforceRootPath(op.value(), path); FSOperations.FSHomeDir command = new FSOperations.FSHomeDir(); - JSONObject json = fsExecute(user, doAs, command); + JSONObject json = fsExecute(user, command); AUDIT_LOG.info(""); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; @@ -290,7 +244,7 @@ public class HttpFSServer { case INSTRUMENTATION: { enforceRootPath(op.value(), path); Groups groups = HttpFSServerWebApp.get().get(Groups.class); - List<String> userGroups = groups.getGroups(user.getName()); + List<String> userGroups = groups.getGroups(user.getShortUserName()); if (!userGroups.contains(HttpFSServerWebApp.get().getAdminGroup())) { throw new AccessControlException( "User not in HttpFSServer admin group"); @@ -304,7 +258,7 @@ public class HttpFSServer { case GETCONTENTSUMMARY: { FSOperations.FSContentSummary command = new FSOperations.FSContentSummary(path); - Map json = fsExecute(user, doAs, command); + Map json = fsExecute(user, command); AUDIT_LOG.info("[{}]", path); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; @@ -312,7 +266,7 @@ public class HttpFSServer { case GETFILECHECKSUM: { FSOperations.FSFileChecksum command = new FSOperations.FSFileChecksum(path); - Map json = fsExecute(user, doAs, command); + Map json = fsExecute(user, command); AUDIT_LOG.info("[{}]", path); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; @@ -324,7 +278,7 @@ public class HttpFSServer { case GETACLSTATUS: { FSOperations.FSAclStatus command = new FSOperations.FSAclStatus(path); - Map json = fsExecute(user, doAs, command); + Map json = fsExecute(user, command); AUDIT_LOG.info("ACL status for [{}]", path); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; @@ -337,7 +291,7 @@ public class HttpFSServer { FSOperations.FSGetXAttrs command = new FSOperations.FSGetXAttrs(path, xattrNames, encoding); @SuppressWarnings("rawtypes") - Map json = fsExecute(user, doAs, command); + Map json = fsExecute(user, command); AUDIT_LOG.info("XAttrs for [{}]", path); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; @@ -345,7 +299,7 @@ public class HttpFSServer { case LISTXATTRS: { FSOperations.FSListXAttrs command = new FSOperations.FSListXAttrs(path); @SuppressWarnings("rawtypes") - Map json = fsExecute(user, doAs, command); + Map json = fsExecute(user, command); AUDIT_LOG.info("XAttr names for [{}]", path); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; @@ -363,7 +317,6 @@ public class HttpFSServer { /** * Binding to handle DELETE requests. * - * @param user the principal of the user making the request. * @param path the path for operation. * @param op the HttpFS operation of the request. * @param params the HttpFS parameters of the request. @@ -379,15 +332,14 @@ public class HttpFSServer { @DELETE @Path("{path:.*}") @Produces(MediaType.APPLICATION_JSON) - public Response delete(@Context Principal user, - @PathParam("path") String path, - @QueryParam(OperationParam.NAME) OperationParam op, - @Context Parameters params) + public Response delete(@PathParam("path") String path, + @QueryParam(OperationParam.NAME) OperationParam op, + @Context Parameters params) throws IOException, FileSystemAccessException { + UserGroupInformation user = HttpUserGroupInformation.get(); Response response; path = makeAbsolute(path); MDC.put(HttpFSFileSystem.OP_PARAM, op.value().name()); - String doAs = params.get(DoAsParam.NAME, DoAsParam.class); switch (op.value()) { case DELETE: { Boolean recursive = @@ -395,7 +347,7 @@ public class HttpFSServer { AUDIT_LOG.info("[{}] recursive [{}]", path, recursive); FSOperations.FSDelete command = new FSOperations.FSDelete(path, recursive); - JSONObject json = fsExecute(user, doAs, command); + JSONObject json = fsExecute(user, command); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; } @@ -412,7 +364,6 @@ public class HttpFSServer { * Binding to handle POST requests. * * @param is the inputstream for the request payload. - * @param user the principal of the user making the request. * @param uriInfo the of the request. * @param path the path for operation. * @param op the HttpFS operation of the request. @@ -431,18 +382,17 @@ public class HttpFSServer { @Consumes({"*/*"}) @Produces({MediaType.APPLICATION_JSON}) public Response post(InputStream is, - @Context Principal user, @Context UriInfo uriInfo, @PathParam("path") String path, @QueryParam(OperationParam.NAME) OperationParam op, @Context Parameters params) throws IOException, FileSystemAccessException { + UserGroupInformation user = HttpUserGroupInformation.get(); Response response; path = makeAbsolute(path); MDC.put(HttpFSFileSystem.OP_PARAM, op.value().name()); switch (op.value()) { case APPEND: { - String doAs = params.get(DoAsParam.NAME, DoAsParam.class); Boolean hasData = params.get(DataParam.NAME, DataParam.class); if (!hasData) { response = Response.temporaryRedirect( @@ -451,7 +401,7 @@ public class HttpFSServer { } else { FSOperations.FSAppend command = new FSOperations.FSAppend(is, path); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info("[{}]", path); response = Response.ok().type(MediaType.APPLICATION_JSON).build(); } @@ -463,7 +413,7 @@ public class HttpFSServer { FSOperations.FSConcat command = new FSOperations.FSConcat(path, sources.split(",")); - fsExecute(user, null, command); + fsExecute(user, command); AUDIT_LOG.info("[{}]", path); System.out.println("SENT RESPONSE"); response = Response.ok().build(); @@ -498,7 +448,6 @@ public class HttpFSServer { * Binding to handle PUT requests. * * @param is the inputstream for the request payload. - * @param user the principal of the user making the request. * @param uriInfo the of the request. * @param path the path for operation. * @param op the HttpFS operation of the request. @@ -517,16 +466,15 @@ public class HttpFSServer { @Consumes({"*/*"}) @Produces({MediaType.APPLICATION_JSON}) public Response put(InputStream is, - @Context Principal user, @Context UriInfo uriInfo, @PathParam("path") String path, @QueryParam(OperationParam.NAME) OperationParam op, @Context Parameters params) throws IOException, FileSystemAccessException { + UserGroupInformation user = HttpUserGroupInformation.get(); Response response; path = makeAbsolute(path); MDC.put(HttpFSFileSystem.OP_PARAM, op.value().name()); - String doAs = params.get(DoAsParam.NAME, DoAsParam.class); switch (op.value()) { case CREATE: { Boolean hasData = params.get(DataParam.NAME, DataParam.class); @@ -546,7 +494,7 @@ public class HttpFSServer { FSOperations.FSCreate command = new FSOperations.FSCreate(is, path, permission, override, replication, blockSize); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info( "[{}] permission [{}] override [{}] replication [{}] blockSize [{}]", new Object[]{path, permission, override, replication, blockSize}); @@ -564,7 +512,7 @@ public class HttpFSServer { FSOperations.FSSetXAttr command = new FSOperations.FSSetXAttr( path, xattrName, xattrValue, flag); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info("[{}] to xAttr [{}]", path, xattrName); response = Response.ok().build(); break; @@ -573,7 +521,7 @@ public class HttpFSServer { String xattrName = params.get(XAttrNameParam.NAME, XAttrNameParam.class); FSOperations.FSRemoveXAttr command = new FSOperations.FSRemoveXAttr( path, xattrName); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info("[{}] removed xAttr [{}]", path, xattrName); response = Response.ok().build(); break; @@ -583,7 +531,7 @@ public class HttpFSServer { PermissionParam.class); FSOperations.FSMkdirs command = new FSOperations.FSMkdirs(path, permission); - JSONObject json = fsExecute(user, doAs, command); + JSONObject json = fsExecute(user, command); AUDIT_LOG.info("[{}] permission [{}]", path, permission); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; @@ -592,7 +540,7 @@ public class HttpFSServer { String toPath = params.get(DestinationParam.NAME, DestinationParam.class); FSOperations.FSRename command = new FSOperations.FSRename(path, toPath); - JSONObject json = fsExecute(user, doAs, command); + JSONObject json = fsExecute(user, command); AUDIT_LOG.info("[{}] to [{}]", path, toPath); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; @@ -602,7 +550,7 @@ public class HttpFSServer { String group = params.get(GroupParam.NAME, GroupParam.class); FSOperations.FSSetOwner command = new FSOperations.FSSetOwner(path, owner, group); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info("[{}] to (O/G)[{}]", path, owner + ":" + group); response = Response.ok().build(); break; @@ -612,7 +560,7 @@ public class HttpFSServer { PermissionParam.class); FSOperations.FSSetPermission command = new FSOperations.FSSetPermission(path, permission); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info("[{}] to [{}]", path, permission); response = Response.ok().build(); break; @@ -622,7 +570,7 @@ public class HttpFSServer { ReplicationParam.class); FSOperations.FSSetReplication command = new FSOperations.FSSetReplication(path, replication); - JSONObject json = fsExecute(user, doAs, command); + JSONObject json = fsExecute(user, command); AUDIT_LOG.info("[{}] to [{}]", path, replication); response = Response.ok(json).build(); break; @@ -634,7 +582,7 @@ public class HttpFSServer { AccessTimeParam.class); FSOperations.FSSetTimes command = new FSOperations.FSSetTimes(path, modifiedTime, accessTime); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info("[{}] to (M/A)[{}]", path, modifiedTime + ":" + accessTime); response = Response.ok().build(); @@ -645,7 +593,7 @@ public class HttpFSServer { AclPermissionParam.class); FSOperations.FSSetAcl command = new FSOperations.FSSetAcl(path, aclSpec); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info("[{}] to acl [{}]", path, aclSpec); response = Response.ok().build(); break; @@ -653,7 +601,7 @@ public class HttpFSServer { case REMOVEACL: { FSOperations.FSRemoveAcl command = new FSOperations.FSRemoveAcl(path); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info("[{}] removed acl", path); response = Response.ok().build(); break; @@ -663,7 +611,7 @@ public class HttpFSServer { AclPermissionParam.class); FSOperations.FSModifyAclEntries command = new FSOperations.FSModifyAclEntries(path, aclSpec); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info("[{}] modify acl entry with [{}]", path, aclSpec); response = Response.ok().build(); break; @@ -673,7 +621,7 @@ public class HttpFSServer { AclPermissionParam.class); FSOperations.FSRemoveAclEntries command = new FSOperations.FSRemoveAclEntries(path, aclSpec); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info("[{}] remove acl entry [{}]", path, aclSpec); response = Response.ok().build(); break; @@ -681,7 +629,7 @@ public class HttpFSServer { case REMOVEDEFAULTACL: { FSOperations.FSRemoveDefaultAcl command = new FSOperations.FSRemoveDefaultAcl(path); - fsExecute(user, doAs, command); + fsExecute(user, command); AUDIT_LOG.info("[{}] remove default acl", path); response = Response.ok().build(); break; Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServerWebApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServerWebApp.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServerWebApp.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServerWebApp.java Sat Aug 16 21:02:21 2014 @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.CommonConfig import org.apache.hadoop.lib.server.ServerException; import org.apache.hadoop.lib.service.FileSystemAccess; import org.apache.hadoop.lib.servlet.ServerWebApp; -import org.apache.hadoop.lib.wsrs.UserProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,9 +102,6 @@ public class HttpFSServerWebApp extends LOG.info("Connects to Namenode [{}]", get().get(FileSystemAccess.class).getFileSystemConfiguration(). get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)); - String userPattern = getConfig().get(UserProvider.USER_PATTERN_KEY, - UserProvider.USER_PATTERN_DEFAULT); - UserProvider.setUserPattern(userPattern); } /** Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/resources/httpfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/resources/httpfs-default.xml?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/resources/httpfs-default.xml (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/resources/httpfs-default.xml Sat Aug 16 21:02:21 2014 @@ -34,7 +34,6 @@ org.apache.hadoop.lib.service.instrumentation.InstrumentationService, org.apache.hadoop.lib.service.scheduler.SchedulerService, org.apache.hadoop.lib.service.security.GroupsService, - org.apache.hadoop.lib.service.security.ProxyUserService, org.apache.hadoop.lib.service.hadoop.FileSystemAccessService </value> <description> @@ -118,6 +117,10 @@ </property> <!-- HttpFSServer proxy user Configuration --> +<!-- + + The following 2 properties within this comment are provided as an + example to facilitate configuring HttpFS proxyusers. <property> <name>httpfs.proxyuser.#USER#.hosts</name> @@ -152,6 +155,7 @@ in the property name. </description> </property> +--> <!-- HttpFS Delegation Token configuration --> Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Sat Aug 16 21:02:21 2014 @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentNa import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS; import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx; @@ -407,4 +408,80 @@ public class TestWrites { } } } + + @Test + public void testOOOWrites() throws IOException, InterruptedException { + NfsConfiguration config = new NfsConfiguration(); + MiniDFSCluster cluster = null; + RpcProgramNfs3 nfsd; + final int bufSize = 32; + final int numOOO = 3; + SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class); + Mockito.when(securityHandler.getUser()).thenReturn( + System.getProperty("user.name")); + String currentUser = System.getProperty("user.name"); + config.set( + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(currentUser), + "*"); + config.set( + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(currentUser), + "*"); + ProxyUsers.refreshSuperUserGroupsConfiguration(config); + // Use emphral port in case tests are running in parallel + config.setInt("nfs3.mountd.port", 0); + config.setInt("nfs3.server.port", 0); + + try { + cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); + cluster.waitActive(); + + Nfs3 nfs3 = new Nfs3(config); + nfs3.startServiceInternal(false); + nfsd = (RpcProgramNfs3) nfs3.getRpcProgram(); + + DFSClient dfsClient = new DFSClient(NameNode.getAddress(config), config); + HdfsFileStatus status = dfsClient.getFileInfo("/"); + FileHandle rootHandle = new FileHandle(status.getFileId()); + + CREATE3Request createReq = new CREATE3Request(rootHandle, + "out-of-order-write" + System.currentTimeMillis(), + Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0); + XDR createXdr = new XDR(); + createReq.serialize(createXdr); + CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(), + securityHandler, new InetSocketAddress("localhost", 1234)); + FileHandle handle = createRsp.getObjHandle(); + + byte[][] oooBuf = new byte[numOOO][bufSize]; + for (int i = 0; i < numOOO; i++) { + Arrays.fill(oooBuf[i], (byte) i); + } + + for (int i = 0; i < numOOO; i++) { + final long offset = (numOOO - 1 - i) * bufSize; + WRITE3Request writeReq = new WRITE3Request(handle, offset, bufSize, + WriteStableHow.UNSTABLE, ByteBuffer.wrap(oooBuf[i])); + XDR writeXdr = new XDR(); + writeReq.serialize(writeXdr); + nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler, + new InetSocketAddress("localhost", 1234)); + } + + waitWrite(nfsd, handle, 60000); + READ3Request readReq = new READ3Request(handle, bufSize, bufSize); + XDR readXdr = new XDR(); + readReq.serialize(readXdr); + READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(), + securityHandler, new InetSocketAddress("localhost", config.getInt( + NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, + NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT))); + assertTrue(Arrays.equals(oooBuf[1], readRsp.getData().array())); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Aug 16 21:02:21 2014 @@ -160,9 +160,6 @@ Trunk (Unreleased) BUG FIXES - HDFS-6517. Remove hadoop-metrics2.properties from hdfs project (Akira - AJISAKA via aw) - HADOOP-9635 Fix potential Stack Overflow in DomainSocket.c (V. Karthik Kumar via cmccabe) @@ -416,6 +413,18 @@ Release 2.6.0 - UNRELEASED HDFS-6838. Code cleanup for unnecessary INode replacement. (Jing Zhao via wheat9) + HDFS-6836. HDFS INFO logging is verbose & uses file appenders. (Xiaoyu + Yao via Arpit Agarwal) + + HDFS-6567. Normalize the order of public final in HdfsFileStatus. + (Tassapol Athiapinya via wheat9) + + HDFS-6849. Replace HttpFS custom proxyuser handling with common + implementation. (tucu) + + HDFS-6850. Move NFS out of order write unit tests into TestWrites class. + (Zhe Zhang via atm) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) @@ -425,6 +434,9 @@ Release 2.6.0 - UNRELEASED HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for insecure HDFS (Allen Wittenauer via raviprak) + HDFS-6517. Remove hadoop-metrics2.properties from hdfs project (Akira + AJISAKA via aw) + HDFS-6617. Flake TestDFSZKFailoverController.testManualFailoverWithDFSHAAdmin due to a long edit log sync op. (Liang Xie via cnauroth) @@ -514,6 +526,14 @@ Release 2.6.0 - UNRELEASED HDFS-6582. Missing null check in RpcProgramNfs3#read(XDR, SecurityHandler) (Abhiraj Butala via brandonli) + HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a + block replica (Arpit Agarwal) + + HDFS-6247. Avoid timeouts for replaceBlock() call by sending intermediate + responses to Balancer (vinayakumarb) + + HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. (Yi Liu and Colin Patrick McCabe via umamahesh) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1617377-1618416 Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java Sat Aug 16 21:02:21 2014 @@ -93,7 +93,7 @@ public class HdfsFileStatus { * Get the length of this file, in bytes. * @return the length of this file, in bytes. */ - final public long getLen() { + public final long getLen() { return length; } @@ -101,7 +101,7 @@ public class HdfsFileStatus { * Is this a directory? * @return true if this is a directory */ - final public boolean isDir() { + public final boolean isDir() { return isdir; } @@ -117,7 +117,7 @@ public class HdfsFileStatus { * Get the block size of the file. * @return the number of bytes */ - final public long getBlockSize() { + public final long getBlockSize() { return blocksize; } @@ -125,7 +125,7 @@ public class HdfsFileStatus { * Get the replication factor of a file. * @return the replication factor of a file. */ - final public short getReplication() { + public final short getReplication() { return block_replication; } @@ -133,7 +133,7 @@ public class HdfsFileStatus { * Get the modification time of the file. * @return the modification time of file in milliseconds since January 1, 1970 UTC. */ - final public long getModificationTime() { + public final long getModificationTime() { return modification_time; } @@ -141,7 +141,7 @@ public class HdfsFileStatus { * Get the access time of the file. * @return the access time of file in milliseconds since January 1, 1970 UTC. */ - final public long getAccessTime() { + public final long getAccessTime() { return access_time; } @@ -149,7 +149,7 @@ public class HdfsFileStatus { * Get FsPermission associated with the file. * @return permssion */ - final public FsPermission getPermission() { + public final FsPermission getPermission() { return permission; } @@ -157,7 +157,7 @@ public class HdfsFileStatus { * Get the owner of the file. * @return owner of the file */ - final public String getOwner() { + public final String getOwner() { return owner; } @@ -165,7 +165,7 @@ public class HdfsFileStatus { * Get the group associated with the file. * @return group for the file. */ - final public String getGroup() { + public final String getGroup() { return group; } @@ -173,7 +173,7 @@ public class HdfsFileStatus { * Check if the local name is empty * @return true if the name is empty */ - final public boolean isEmptyLocalName() { + public final boolean isEmptyLocalName() { return path.length == 0; } @@ -181,7 +181,7 @@ public class HdfsFileStatus { * Get the string representation of the local name * @return the local name in string */ - final public String getLocalName() { + public final String getLocalName() { return DFSUtil.bytes2String(path); } @@ -189,7 +189,7 @@ public class HdfsFileStatus { * Get the Java UTF8 representation of the local name * @return the local name in java UTF8 */ - final public byte[] getLocalNameInBytes() { + public final byte[] getLocalNameInBytes() { return path; } @@ -198,7 +198,7 @@ public class HdfsFileStatus { * @param parent the parent path * @return the full path in string */ - final public String getFullName(final String parent) { + public final String getFullName(final String parent) { if (isEmptyLocalName()) { return parent; } @@ -216,7 +216,7 @@ public class HdfsFileStatus { * @param parent the parent path * @return the full path */ - final public Path getFullPath(final Path parent) { + public final Path getFullPath(final Path parent) { if (isEmptyLocalName()) { return parent; } @@ -228,19 +228,19 @@ public class HdfsFileStatus { * Get the string representation of the symlink. * @return the symlink as a string. */ - final public String getSymlink() { + public final String getSymlink() { return DFSUtil.bytes2String(symlink); } - final public byte[] getSymlinkInBytes() { + public final byte[] getSymlinkInBytes() { return symlink; } - final public long getFileId() { + public final long getFileId() { return fileId; } - final public int getChildrenNum() { + public final int getChildrenNum() { return childrenNum; } @@ -249,7 +249,7 @@ public class HdfsFileStatus { return storagePolicy; } - final public FileStatus makeQualified(URI defaultUri, Path path) { + public final FileStatus makeQualified(URI defaultUri, Path path) { return new FileStatus(getLen(), isDir(), getReplication(), getBlockSize(), getModificationTime(), getAccessTime(), Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java Sat Aug 16 21:02:21 2014 @@ -67,7 +67,7 @@ public class HdfsLocatedFileStatus exten return locations; } - final public LocatedFileStatus makeQualifiedLocated(URI defaultUri, + public final LocatedFileStatus makeQualifiedLocated(URI defaultUri, Path path) { return new LocatedFileStatus(getLen(), isDir(), getReplication(), getBlockSize(), getModificationTime(), Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java Sat Aug 16 21:02:21 2014 @@ -87,8 +87,6 @@ public class Dispatcher { private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5; private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds - private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20 - // minutes private final NameNodeConnector nnc; private final SaslDataTransferClient saslClient; @@ -278,13 +276,6 @@ public class Dispatcher { sock.connect( NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()), HdfsServerConstants.READ_TIMEOUT); - /* - * Unfortunately we don't have a good way to know if the Datanode is - * taking a really long time to move a block, OR something has gone - * wrong and it's never going to finish. To deal with this scenario, we - * set a long timeout (20 minutes) to avoid hanging indefinitely. - */ - sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); sock.setKeepAlive(true); @@ -341,8 +332,12 @@ public class Dispatcher { /** Receive a block copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { - BlockOpResponseProto response = BlockOpResponseProto - .parseFrom(vintPrefixed(in)); + BlockOpResponseProto response = + BlockOpResponseProto.parseFrom(vintPrefixed(in)); + while (response.getStatus() == Status.IN_PROGRESS) { + // read intermediate responses + response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); + } if (response.getStatus() != Status.SUCCESS) { if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new IOException("block move failed due to access token error"); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Sat Aug 16 21:02:21 2014 @@ -194,24 +194,12 @@ public class BlockInfo extends Block imp * Add a {@link DatanodeStorageInfo} location for a block */ boolean addStorage(DatanodeStorageInfo storage) { - boolean added = true; - int idx = findDatanode(storage.getDatanodeDescriptor()); - if(idx >= 0) { - if (getStorageInfo(idx) == storage) { // the storage is already there - return false; - } else { - // The block is on the DN but belongs to a different storage. - // Update our state. - removeStorage(getStorageInfo(idx)); - added = false; // Just updating storage. Return false. - } - } // find the last null node int lastNode = ensureCapacity(1); setStorageInfo(lastNode, storage); setNext(lastNode, null); setPrevious(lastNode, null); - return added; + return true; } /** @@ -240,16 +228,18 @@ public class BlockInfo extends Block imp * Find specified DatanodeDescriptor. * @return index or -1 if not found. */ - int findDatanode(DatanodeDescriptor dn) { + boolean findDatanode(DatanodeDescriptor dn) { int len = getCapacity(); for(int idx = 0; idx < len; idx++) { DatanodeDescriptor cur = getDatanode(idx); - if(cur == dn) - return idx; - if(cur == null) + if(cur == dn) { + return true; + } + if(cur == null) { break; + } } - return -1; + return false; } /** * Find specified DatanodeStorageInfo. Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Sat Aug 16 21:02:21 2014 @@ -2096,7 +2096,7 @@ public class BlockManager { // Add replica if appropriate. If the replica was previously corrupt // but now okay, it might need to be updated. if (reportedState == ReplicaState.FINALIZED - && (storedBlock.findDatanode(dn) < 0 + && (!storedBlock.findDatanode(dn) || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { toAdd.add(storedBlock); } @@ -2277,7 +2277,7 @@ public class BlockManager { storageInfo, ucBlock.reportedBlock, ucBlock.reportedState); if (ucBlock.reportedState == ReplicaState.FINALIZED && - block.findDatanode(storageInfo.getDatanodeDescriptor()) < 0) { + !block.findDatanode(storageInfo.getDatanodeDescriptor())) { addStoredBlock(block, storageInfo, null, true); } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Sat Aug 16 21:02:21 2014 @@ -104,21 +104,21 @@ public class CacheReplicationMonitor ext private final Condition scanFinished; /** - * Whether there are pending CacheManager operations that necessitate a - * CacheReplicationMonitor rescan. Protected by the CRM lock. + * The number of rescans completed. Used to wait for scans to finish. + * Protected by the CacheReplicationMonitor lock. */ - private boolean needsRescan = true; + private long completedScanCount = 0; /** - * Whether we are currently doing a rescan. Protected by the CRM lock. + * The scan we're currently performing, or -1 if no scan is in progress. + * Protected by the CacheReplicationMonitor lock. */ - private boolean isScanning = false; + private long curScanCount = -1; /** - * The number of rescans completed. Used to wait for scans to finish. - * Protected by the CacheReplicationMonitor lock. + * The number of rescans we need to complete. Protected by the CRM lock. */ - private long scanCount = 0; + private long neededScanCount = 0; /** * True if this monitor should terminate. Protected by the CRM lock. @@ -169,7 +169,7 @@ public class CacheReplicationMonitor ext LOG.info("Shutting down CacheReplicationMonitor"); return; } - if (needsRescan) { + if (completedScanCount < neededScanCount) { LOG.info("Rescanning because of pending operations"); break; } @@ -182,8 +182,6 @@ public class CacheReplicationMonitor ext doRescan.await(delta, TimeUnit.MILLISECONDS); curTimeMs = Time.monotonicNow(); } - isScanning = true; - needsRescan = false; } finally { lock.unlock(); } @@ -194,8 +192,8 @@ public class CacheReplicationMonitor ext // Update synchronization-related variables. lock.lock(); try { - isScanning = false; - scanCount++; + completedScanCount = curScanCount; + curScanCount = -1; scanFinished.signalAll(); } finally { lock.unlock(); @@ -226,16 +224,15 @@ public class CacheReplicationMonitor ext "Must not hold the FSN write lock when waiting for a rescan."); Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when waiting for a rescan."); - if (!needsRescan) { + if (neededScanCount <= completedScanCount) { return; } // If no scan is already ongoing, mark the CRM as dirty and kick - if (!isScanning) { + if (curScanCount < 0) { doRescan.signal(); } // Wait until the scan finishes and the count advances - final long startCount = scanCount; - while ((!shutdown) && (startCount >= scanCount)) { + while ((!shutdown) && (completedScanCount < neededScanCount)) { try { scanFinished.await(); } catch (InterruptedException e) { @@ -253,7 +250,14 @@ public class CacheReplicationMonitor ext public void setNeedsRescan() { Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when setting the needsRescan bit."); - this.needsRescan = true; + if (curScanCount >= 0) { + // If there is a scan in progress, we need to wait for the scan after + // that. + neededScanCount = curScanCount + 1; + } else { + // If there is no scan in progress, we need to wait for the next scan. + neededScanCount = completedScanCount + 1; + } } /** @@ -284,10 +288,17 @@ public class CacheReplicationMonitor ext scannedBlocks = 0; namesystem.writeLock(); try { + lock.lock(); if (shutdown) { throw new InterruptedException("CacheReplicationMonitor was " + "shut down."); } + curScanCount = completedScanCount + 1; + } + finally { + lock.unlock(); + } + try { resetStatistics(); rescanCacheDirectives(); rescanCachedBlockMap(); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Sat Aug 16 21:02:21 2014 @@ -208,12 +208,28 @@ public class DatanodeStorageInfo { } public boolean addBlock(BlockInfo b) { - if(!b.addStorage(this)) - return false; + // First check whether the block belongs to a different storage + // on the same DN. + boolean replaced = false; + DatanodeStorageInfo otherStorage = + b.findStorageInfo(getDatanodeDescriptor()); + + if (otherStorage != null) { + if (otherStorage != this) { + // The block belongs to a different storage. Remove it first. + otherStorage.removeBlock(b); + replaced = true; + } else { + // The block is already associated with this storage. + return false; + } + } + // add to the head of the data-node list + b.addStorage(this); blockList = b.listInsert(blockList, this); numBlocks++; - return true; + return !replaced; } boolean removeBlock(BlockInfo b) { Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Sat Aug 16 21:02:21 2014 @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.d import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; @@ -123,6 +124,14 @@ class BlockReceiver implements Closeable private boolean syncOnClose; private long restartBudget; + /** + * for replaceBlock response + */ + private final long responseInterval; + private long lastResponseTime = 0; + private boolean isReplaceBlock = false; + private DataOutputStream replyOut = null; + BlockReceiver(final ExtendedBlock block, final StorageType storageType, final DataInputStream in, final String inAddr, final String myAddr, @@ -144,6 +153,9 @@ class BlockReceiver implements Closeable this.isClient = !this.isDatanode; this.restartBudget = datanode.getDnConf().restartReplicaExpiry; this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs; + // For replaceBlock() calls response should be sent to avoid socketTimeout + // at clients. So sending with the interval of 0.5 * socketTimeout + this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5); //for datanode, we have //1: clientName.length() == 0, and //2: stage == null or PIPELINE_SETUP_CREATE @@ -651,6 +663,20 @@ class BlockReceiver implements Closeable lastPacketInBlock, offsetInBlock, Status.SUCCESS); } + /* + * Send in-progress responses for the replaceBlock() calls back to caller to + * avoid timeouts due to balancer throttling. HDFS-6247 + */ + if (isReplaceBlock + && (Time.monotonicNow() - lastResponseTime > responseInterval)) { + BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder() + .setStatus(Status.IN_PROGRESS); + response.build().writeDelimitedTo(replyOut); + replyOut.flush(); + + lastResponseTime = Time.monotonicNow(); + } + if (throttler != null) { // throttle I/O throttler.throttle(len); } @@ -718,7 +744,8 @@ class BlockReceiver implements Closeable DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, - DatanodeInfo[] downstreams) throws IOException { + DatanodeInfo[] downstreams, + boolean isReplaceBlock) throws IOException { syncOnClose = datanode.getDnConf().syncOnClose; boolean responderClosed = false; @@ -726,6 +753,9 @@ class BlockReceiver implements Closeable mirrorAddr = mirrAddr; throttler = throttlerArg; + this.replyOut = replyOut; + this.isReplaceBlock = isReplaceBlock; + try { if (isClient && !isTransfer) { responder = new Daemon(datanode.threadGroup, Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Sat Aug 16 21:02:21 2014 @@ -687,7 +687,7 @@ class BlockSender implements java.io.Clo // Trigger readahead of beginning of file if configured. manageOsCache(); - final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; + final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0; try { int maxChunksPerPacket; int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; @@ -733,9 +733,9 @@ class BlockSender implements java.io.Clo sentEntireByteRange = true; } } finally { - if (clientTraceFmt != null) { + if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) { final long endTime = System.nanoTime(); - ClientTraceLog.info(String.format(clientTraceFmt, totalRead, + ClientTraceLog.debug(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime)); } close(); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Sat Aug 16 21:02:21 2014 @@ -708,7 +708,7 @@ class DataXceiver extends Receiver imple if (blockReceiver != null) { String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, - mirrorAddr, null, targets); + mirrorAddr, null, targets, false); // send close-ack for transfer-RBW/Finalized if (isTransfer) { @@ -983,7 +983,7 @@ class DataXceiver extends Receiver imple String errMsg = null; BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; - + DataOutputStream replyOut = new DataOutputStream(getOutputStream()); try { // get the output stream to the proxy final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); @@ -1040,8 +1040,8 @@ class DataXceiver extends Receiver imple CachingStrategy.newDropBehind()); // receive a block - blockReceiver.receiveBlock(null, null, null, null, - dataXceiverServer.balanceThrottler, null); + blockReceiver.receiveBlock(null, null, replyOut, null, + dataXceiverServer.balanceThrottler, null, true); // notify name node datanode.notifyNamenodeReceivedBlock( @@ -1076,6 +1076,7 @@ class DataXceiver extends Receiver imple IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); IOUtils.closeStream(proxyReply); + IOUtils.closeStream(replyOut); } //update metrics Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Sat Aug 16 21:02:21 2014 @@ -207,6 +207,7 @@ enum Status { OOB_RESERVED1 = 9; // Reserved OOB_RESERVED2 = 10; // Reserved OOB_RESERVED3 = 11; // Reserved + IN_PROGRESS = 12; } message PipelineAckProto { Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java Sat Aug 16 21:02:21 2014 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -59,17 +60,24 @@ public class TestBlockInfo { @Test - public void testReplaceStorageIfDifferetnOneAlreadyExistedFromSameDataNode() throws Exception { - BlockInfo blockInfo = new BlockInfo(3); + public void testReplaceStorage() throws Exception { + // Create two dummy storages. final DatanodeStorageInfo storage1 = DFSTestUtil.createDatanodeStorageInfo("storageID1", "127.0.0.1"); final DatanodeStorageInfo storage2 = new DatanodeStorageInfo(storage1.getDatanodeDescriptor(), new DatanodeStorage("storageID2")); + final int NUM_BLOCKS = 10; + BlockInfo[] blockInfos = new BlockInfo[NUM_BLOCKS]; - blockInfo.addStorage(storage1); - boolean added = blockInfo.addStorage(storage2); + // Create a few dummy blocks and add them to the first storage. + for (int i = 0; i < NUM_BLOCKS; ++i) { + blockInfos[i] = new BlockInfo(3); + storage1.addBlock(blockInfos[i]); + } - Assert.assertFalse(added); - Assert.assertEquals(storage2, blockInfo.getStorageInfo(0)); + // Try to move one of the blocks to a different storage. + boolean added = storage2.addBlock(blockInfos[NUM_BLOCKS/2]); + Assert.assertThat(added, is(false)); + Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2)); } @Test Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Sat Aug 16 21:02:21 2014 @@ -272,8 +272,10 @@ public class TestBlockReplacement { // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); - BlockOpResponseProto proto = - BlockOpResponseProto.parseDelimitedFrom(reply); + BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); + while (proto.getStatus() == Status.IN_PROGRESS) { + proto = BlockOpResponseProto.parseDelimitedFrom(reply); + } return proto.getStatus() == Status.SUCCESS; } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml Sat Aug 16 21:02:21 2014 @@ -8655,6 +8655,50 @@ </comparators> </test> + <test> <!-- TESTED --> + <description>count: file using -h option</description> + <test-commands> + <command>-fs NAMENODE -mkdir -p dir</command> <!-- make sure user home dir exists --> + <command>-fs NAMENODE -put CLITEST_DATA/data15bytes file1</command> + <command>-fs NAMENODE -put CLITEST_DATA/data1k file2</command> + <command>-fs NAMENODE -count -h file1 file2</command> + </test-commands> + <cleanup-commands> + <command>-fs NAMENODE -rm file1 file2</command> + </cleanup-commands> + <comparators> + <comparator> + <type>RegexpComparator</type> + <expected-output>( |\t)*0( |\t)*1( |\t)*15 file1</expected-output> + </comparator> + </comparators> + <comparators> + <comparator> + <type>RegexpComparator</type> + <expected-output>( |\t)*0( |\t)*1( |\t)*1\.0 K file2</expected-output> + </comparator> + </comparators> + </test> + + <test> <!-- TESTED --> + <description>count: directory using -q and -h options</description> + <test-commands> + <command>-fs NAMENODE -mkdir /dir1</command> + <dfs-admin-command>-fs NAMENODE -setQuota 10 /dir1 </dfs-admin-command> + <dfs-admin-command>-fs NAMENODE -setSpaceQuota 1m /dir1 </dfs-admin-command> + <command>-fs NAMENODE -count -q -h /dir1</command> + </test-commands> + <cleanup-commands> + <command>-fs NAMENODE -rm -r /dir1</command> + </cleanup-commands> + <comparators> + <comparator> + <type>RegexpComparator</type> + <expected-output>( |\t)*10( |\t)*9( |\t)*1 M( |\t)*1 M( |\t)*1( |\t)*0( |\t)*0 /dir1</expected-output> + </comparator> + </comparators> + </test> + <!-- Tests for chmod --> <test> <!-- TESTED --> <description>chmod: change permission(octal mode) of file in absolute path</description>