Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode.snapshot; import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -29,9 +30,11 @@ import java.util.concurrent.atomic.Atomi import javax.management.ObjectName; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.protocol.SnapshotInfo; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSImageFormat; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -39,9 +42,10 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INodeDirectory; import org.apache.hadoop.hdfs.server.namenode.INodesInPath; -import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SnapshotDiffInfo; import org.apache.hadoop.metrics2.util.MBeans; +import com.google.common.base.Preconditions; + /** * Manage snapshottable directories and their snapshots. * @@ -64,8 +68,8 @@ public class SnapshotManager implements private int snapshotCounter = 0; /** All snapshottable directories in the namesystem. */ - private final Map<Long, INodeDirectorySnapshottable> snapshottables - = new HashMap<Long, INodeDirectorySnapshottable>(); + private final Map<Long, INodeDirectory> snapshottables = + new HashMap<Long, INodeDirectory>(); public SnapshotManager(final FSDirectory fsdir) { this.fsdir = fsdir; @@ -82,7 +86,7 @@ public class SnapshotManager implements return; } - for(INodeDirectorySnapshottable s : snapshottables.values()) { + for(INodeDirectory s : snapshottables.values()) { if (s.isAncestorDirectory(dir)) { throw new SnapshotException( "Nested snapshottable directories not allowed: path=" + path @@ -110,33 +114,30 @@ public class SnapshotManager implements checkNestedSnapshottable(d, path); } - - final INodeDirectorySnapshottable s; if (d.isSnapshottable()) { //The directory is already a snapshottable directory. - s = (INodeDirectorySnapshottable)d; - s.setSnapshotQuota(INodeDirectorySnapshottable.SNAPSHOT_LIMIT); + d.setSnapshotQuota(DirectorySnapshottableFeature.SNAPSHOT_LIMIT); } else { - s = d.replaceSelf4INodeDirectorySnapshottable(iip.getLatestSnapshotId(), - fsdir.getINodeMap()); + d.addSnapshottableFeature(); } - addSnapshottable(s); + addSnapshottable(d); } /** Add the given snapshottable directory to {@link #snapshottables}. */ - public void addSnapshottable(INodeDirectorySnapshottable dir) { + public void addSnapshottable(INodeDirectory dir) { + Preconditions.checkArgument(dir.isSnapshottable()); snapshottables.put(dir.getId(), dir); } /** Remove the given snapshottable directory from {@link #snapshottables}. */ - private void removeSnapshottable(INodeDirectorySnapshottable s) { + private void removeSnapshottable(INodeDirectory s) { snapshottables.remove(s.getId()); } /** Remove snapshottable directories from {@link #snapshottables} */ - public void removeSnapshottable(List<INodeDirectorySnapshottable> toRemove) { + public void removeSnapshottable(List<INodeDirectory> toRemove) { if (toRemove != null) { - for (INodeDirectorySnapshottable s : toRemove) { + for (INodeDirectory s : toRemove) { removeSnapshottable(s); } } @@ -150,22 +151,22 @@ public class SnapshotManager implements public void resetSnapshottable(final String path) throws IOException { final INodesInPath iip = fsdir.getINodesInPath4Write(path); final INodeDirectory d = INodeDirectory.valueOf(iip.getLastINode(), path); - if (!d.isSnapshottable()) { + DirectorySnapshottableFeature sf = d.getDirectorySnapshottableFeature(); + if (sf == null) { // the directory is already non-snapshottable return; } - final INodeDirectorySnapshottable s = (INodeDirectorySnapshottable) d; - if (s.getNumSnapshots() > 0) { + if (sf.getNumSnapshots() > 0) { throw new SnapshotException("The directory " + path + " has snapshot(s). " + "Please redo the operation after removing all the snapshots."); } - if (s == fsdir.getRoot()) { - s.setSnapshotQuota(0); + if (d == fsdir.getRoot()) { + d.setSnapshotQuota(0); } else { - s.replaceSelf(iip.getLatestSnapshotId(), fsdir.getINodeMap()); + d.removeSnapshottableFeature(); } - removeSnapshottable(s); + removeSnapshottable(d); } /** @@ -178,10 +179,15 @@ public class SnapshotManager implements * Throw IOException when the given path does not lead to an * existing snapshottable directory. */ - public INodeDirectorySnapshottable getSnapshottableRoot(final String path - ) throws IOException { - final INodesInPath i = fsdir.getINodesInPath4Write(path); - return INodeDirectorySnapshottable.valueOf(i.getLastINode(), path); + public INodeDirectory getSnapshottableRoot(final String path) + throws IOException { + final INodeDirectory dir = INodeDirectory.valueOf(fsdir + .getINodesInPath4Write(path).getLastINode(), path); + if (!dir.isSnapshottable()) { + throw new SnapshotException( + "Directory is not a snapshottable directory: " + path); + } + return dir; } /** @@ -200,7 +206,7 @@ public class SnapshotManager implements */ public String createSnapshot(final String path, String snapshotName ) throws IOException { - INodeDirectorySnapshottable srcRoot = getSnapshottableRoot(path); + INodeDirectory srcRoot = getSnapshottableRoot(path); if (snapshotCounter == getMaxSnapshotID()) { // We have reached the maximum allowable snapshot ID and since we don't @@ -233,7 +239,7 @@ public class SnapshotManager implements // parse the path, and check if the path is a snapshot path // the INodeDirectorySnapshottable#valueOf method will throw Exception // if the path is not for a snapshottable directory - INodeDirectorySnapshottable srcRoot = getSnapshottableRoot(path); + INodeDirectory srcRoot = getSnapshottableRoot(path); srcRoot.removeSnapshot(snapshotName, collectedBlocks, removedINodes); numSnapshots.getAndDecrement(); } @@ -256,8 +262,7 @@ public class SnapshotManager implements final String newSnapshotName) throws IOException { // Find the source root directory path where the snapshot was taken. // All the check for path has been included in the valueOf method. - final INodeDirectorySnapshottable srcRoot - = INodeDirectorySnapshottable.valueOf(fsdir.getINode(path), path); + final INodeDirectory srcRoot = getSnapshottableRoot(path); // Note that renameSnapshot and createSnapshot are synchronized externally // through FSNamesystem's write lock srcRoot.renameSnapshot(path, oldSnapshotName, newSnapshotName); @@ -283,9 +288,26 @@ public class SnapshotManager implements snapshotCounter = counter; } - INodeDirectorySnapshottable[] getSnapshottableDirs() { + INodeDirectory[] getSnapshottableDirs() { return snapshottables.values().toArray( - new INodeDirectorySnapshottable[snapshottables.size()]); + new INodeDirectory[snapshottables.size()]); + } + + /** + * Write {@link #snapshotCounter}, {@link #numSnapshots}, + * and all snapshots to the DataOutput. + */ + public void write(DataOutput out) throws IOException { + out.writeInt(snapshotCounter); + out.writeInt(numSnapshots.get()); + + // write all snapshots. + for(INodeDirectory snapshottableDir : snapshottables.values()) { + for (Snapshot s : snapshottableDir.getDirectorySnapshottableFeature() + .getSnapshotList()) { + s.write(out); + } + } } /** @@ -321,16 +343,16 @@ public class SnapshotManager implements List<SnapshottableDirectoryStatus> statusList = new ArrayList<SnapshottableDirectoryStatus>(); - for (INodeDirectorySnapshottable dir : snapshottables.values()) { + for (INodeDirectory dir : snapshottables.values()) { if (userName == null || userName.equals(dir.getUserName())) { SnapshottableDirectoryStatus status = new SnapshottableDirectoryStatus( dir.getModificationTime(), dir.getAccessTime(), dir.getFsPermission(), dir.getUserName(), dir.getGroupName(), dir.getLocalNameBytes(), dir.getId(), dir.getChildrenNum(Snapshot.CURRENT_STATE_ID), - dir.getNumSnapshots(), - dir.getSnapshotQuota(), dir.getParent() == null ? - DFSUtil.EMPTY_BYTES : + dir.getDirectorySnapshottableFeature().getNumSnapshots(), + dir.getDirectorySnapshottableFeature().getSnapshotQuota(), + dir.getParent() == null ? DFSUtil.EMPTY_BYTES : DFSUtil.string2Bytes(dir.getParent().getFullPathName())); statusList.add(status); } @@ -344,21 +366,22 @@ public class SnapshotManager implements * Compute the difference between two snapshots of a directory, or between a * snapshot of the directory and its current tree. */ - public SnapshotDiffInfo diff(final String path, final String from, + public SnapshotDiffReport diff(final String path, final String from, final String to) throws IOException { + // Find the source root directory path where the snapshots were taken. + // All the check for path has been included in the valueOf method. + final INodeDirectory snapshotRoot = getSnapshottableRoot(path); + if ((from == null || from.isEmpty()) && (to == null || to.isEmpty())) { // both fromSnapshot and toSnapshot indicate the current tree - return null; + return new SnapshotDiffReport(path, from, to, + Collections.<DiffReportEntry> emptyList()); } - - // Find the source root directory path where the snapshots were taken. - // All the check for path has been included in the valueOf method. - INodesInPath inodesInPath = fsdir.getINodesInPath4Write(path.toString()); - final INodeDirectorySnapshottable snapshotRoot = INodeDirectorySnapshottable - .valueOf(inodesInPath.getLastINode(), path); - - return snapshotRoot.computeDiff(from, to); + final SnapshotDiffInfo diffs = snapshotRoot + .getDirectorySnapshottableFeature().computeDiff(snapshotRoot, from, to); + return diffs != null ? diffs.generateReport() : new SnapshotDiffReport( + path, from, to, Collections.<DiffReportEntry> emptyList()); } public void clearSnapshottableDirs() { @@ -391,7 +414,7 @@ public class SnapshotManager implements getSnapshottableDirectories() { List<SnapshottableDirectoryStatus.Bean> beans = new ArrayList<SnapshottableDirectoryStatus.Bean>(); - for (INodeDirectorySnapshottable d : getSnapshottableDirs()) { + for (INodeDirectory d : getSnapshottableDirs()) { beans.add(toBean(d)); } return beans.toArray(new SnapshottableDirectoryStatus.Bean[beans.size()]); @@ -400,20 +423,19 @@ public class SnapshotManager implements @Override // SnapshotStatsMXBean public SnapshotInfo.Bean[] getSnapshots() { List<SnapshotInfo.Bean> beans = new ArrayList<SnapshotInfo.Bean>(); - for (INodeDirectorySnapshottable d : getSnapshottableDirs()) { - for (Snapshot s : d.getSnapshotList()) { + for (INodeDirectory d : getSnapshottableDirs()) { + for (Snapshot s : d.getDirectorySnapshottableFeature().getSnapshotList()) { beans.add(toBean(s)); } } return beans.toArray(new SnapshotInfo.Bean[beans.size()]); } - public static SnapshottableDirectoryStatus.Bean toBean( - INodeDirectorySnapshottable d) { + public static SnapshottableDirectoryStatus.Bean toBean(INodeDirectory d) { return new SnapshottableDirectoryStatus.Bean( d.getFullPathName(), - d.getNumSnapshots(), - d.getSnapshotQuota(), + d.getDirectorySnapshottableFeature().getNumSnapshots(), + d.getDirectorySnapshottableFeature().getSnapshotQuota(), d.getModificationTime(), Short.valueOf(Integer.toOctalString( d.getFsPermissionShort())),
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Tue Aug 19 23:49:39 2014 @@ -28,6 +28,8 @@ import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; @@ -53,8 +55,11 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -81,6 +86,7 @@ import org.apache.hadoop.hdfs.web.resour import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; import org.apache.hadoop.hdfs.web.resources.DestinationParam; import org.apache.hadoop.hdfs.web.resources.DoAsParam; +import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam; import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.GroupParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam; @@ -88,6 +94,7 @@ import org.apache.hadoop.hdfs.web.resour import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam; import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam; import org.apache.hadoop.hdfs.web.resources.OffsetParam; +import org.apache.hadoop.hdfs.web.resources.OldSnapshotNameParam; import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.hdfs.web.resources.OwnerParam; import org.apache.hadoop.hdfs.web.resources.Param; @@ -98,20 +105,30 @@ import org.apache.hadoop.hdfs.web.resour import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam; import org.apache.hadoop.hdfs.web.resources.RenewerParam; import org.apache.hadoop.hdfs.web.resources.ReplicationParam; +import org.apache.hadoop.hdfs.web.resources.SnapshotNameParam; import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; import org.apache.hadoop.hdfs.web.resources.UserParam; +import org.apache.hadoop.hdfs.web.resources.XAttrEncodingParam; +import org.apache.hadoop.hdfs.web.resources.XAttrNameParam; +import org.apache.hadoop.hdfs.web.resources.XAttrSetFlagParam; +import org.apache.hadoop.hdfs.web.resources.XAttrValueParam; +import org.apache.hadoop.hdfs.web.resources.FsActionParam; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; +import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; +import com.google.common.collect.Lists; import com.sun.jersey.spi.container.ResourceFilters; /** Web-hdfs NameNode implementation. */ @@ -162,22 +179,44 @@ public class NamenodeWebHdfsMethods { //clear content type response.setContentType(null); + + // set the remote address, if coming in via a trust proxy server then + // the address with be that of the proxied client + REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request)); } + private void reset() { + REMOTE_ADDRESS.set(null); + } + private static NamenodeProtocols getRPCServer(NameNode namenode) throws IOException { final NamenodeProtocols np = namenode.getRpcServer(); if (np == null) { - throw new IOException("Namenode is in startup mode"); + throw new RetriableException("Namenode is in startup mode"); } return np; } - + @VisibleForTesting static DatanodeInfo chooseDatanode(final NameNode namenode, final String path, final HttpOpParam.Op op, final long openOffset, - final long blocksize) throws IOException { + final long blocksize, final String excludeDatanodes) throws IOException { final BlockManager bm = namenode.getNamesystem().getBlockManager(); + + HashSet<Node> excludes = new HashSet<Node>(); + if (excludeDatanodes != null) { + for (String host : StringUtils + .getTrimmedStringCollection(excludeDatanodes)) { + int idx = host.indexOf(":"); + if (idx != -1) { + excludes.add(bm.getDatanodeManager().getDatanodeByXferAddr( + host.substring(0, idx), Integer.parseInt(host.substring(idx + 1)))); + } else { + excludes.add(bm.getDatanodeManager().getDatanodeByHost(host)); + } + } + } if (op == PutOpParam.Op.CREATE) { //choose a datanode near to client @@ -186,7 +225,7 @@ public class NamenodeWebHdfsMethods { if (clientNode != null) { final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy() .chooseTarget(path, 1, clientNode, - new ArrayList<DatanodeStorageInfo>(), false, null, blocksize, + new ArrayList<DatanodeStorageInfo>(), false, excludes, blocksize, // TODO: get storage type from the file StorageType.DEFAULT); if (storages.length > 0) { @@ -215,7 +254,7 @@ public class NamenodeWebHdfsMethods { final LocatedBlocks locations = np.getBlockLocations(path, offset, 1); final int count = locations.locatedBlockCount(); if (count > 0) { - return bestNode(locations.get(0).getLocations()); + return bestNode(locations.get(0).getLocations(), excludes); } } } @@ -229,11 +268,14 @@ public class NamenodeWebHdfsMethods { * sorted based on availability and network distances, thus it is sufficient * to return the first element of the node here. */ - private static DatanodeInfo bestNode(DatanodeInfo[] nodes) throws IOException { - if (nodes.length == 0 || nodes[0].isDecommissioned()) { - throw new IOException("No active nodes contain this block"); + private static DatanodeInfo bestNode(DatanodeInfo[] nodes, + HashSet<Node> excludes) throws IOException { + for (DatanodeInfo dn: nodes) { + if (false == dn.isDecommissioned() && false == excludes.contains(dn)) { + return dn; + } } - return nodes[0]; + throw new IOException("No active nodes contain this block"); } private Token<? extends TokenIdentifier> generateDelegationToken( @@ -252,11 +294,12 @@ public class NamenodeWebHdfsMethods { final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final String path, final HttpOpParam.Op op, final long openOffset, - final long blocksize, + final long blocksize, final String excludeDatanodes, final Param<?, ?>... parameters) throws URISyntaxException, IOException { final DatanodeInfo dn; try { - dn = chooseDatanode(namenode, path, op, openOffset, blocksize); + dn = chooseDatanode(namenode, path, op, openOffset, blocksize, + excludeDatanodes); } catch (InvalidTopologyException ite) { throw new IOException("Failed to find datanode, suggest to check cluster health.", ite); } @@ -333,12 +376,25 @@ public class NamenodeWebHdfsMethods { @QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT) final TokenArgumentParam delegationTokenArgument, @QueryParam(AclPermissionParam.NAME) @DefaultValue(AclPermissionParam.DEFAULT) - final AclPermissionParam aclPermission - )throws IOException, InterruptedException { + final AclPermissionParam aclPermission, + @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) + final XAttrNameParam xattrName, + @QueryParam(XAttrValueParam.NAME) @DefaultValue(XAttrValueParam.DEFAULT) + final XAttrValueParam xattrValue, + @QueryParam(XAttrSetFlagParam.NAME) @DefaultValue(XAttrSetFlagParam.DEFAULT) + final XAttrSetFlagParam xattrSetFlag, + @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT) + final SnapshotNameParam snapshotName, + @QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT) + final OldSnapshotNameParam oldSnapshotName, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes + ) throws IOException, InterruptedException { return put(ugi, delegation, username, doAsUser, ROOT, op, destination, owner, group, permission, overwrite, bufferSize, replication, blockSize, modificationTime, accessTime, renameOptions, createParent, - delegationTokenArgument,aclPermission); + delegationTokenArgument, aclPermission, xattrName, xattrValue, + xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes); } /** Handle HTTP PUT request. */ @@ -384,25 +440,39 @@ public class NamenodeWebHdfsMethods { @QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT) final TokenArgumentParam delegationTokenArgument, @QueryParam(AclPermissionParam.NAME) @DefaultValue(AclPermissionParam.DEFAULT) - final AclPermissionParam aclPermission + final AclPermissionParam aclPermission, + @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) + final XAttrNameParam xattrName, + @QueryParam(XAttrValueParam.NAME) @DefaultValue(XAttrValueParam.DEFAULT) + final XAttrValueParam xattrValue, + @QueryParam(XAttrSetFlagParam.NAME) @DefaultValue(XAttrSetFlagParam.DEFAULT) + final XAttrSetFlagParam xattrSetFlag, + @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT) + final SnapshotNameParam snapshotName, + @QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT) + final OldSnapshotNameParam oldSnapshotName, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes ) throws IOException, InterruptedException { init(ugi, delegation, username, doAsUser, path, op, destination, owner, group, permission, overwrite, bufferSize, replication, blockSize, - modificationTime, accessTime, renameOptions, delegationTokenArgument,aclPermission); + modificationTime, accessTime, renameOptions, delegationTokenArgument, + aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName, + oldSnapshotName, excludeDatanodes); return ugi.doAs(new PrivilegedExceptionAction<Response>() { @Override public Response run() throws IOException, URISyntaxException { - REMOTE_ADDRESS.set(request.getRemoteAddr()); try { return put(ugi, delegation, username, doAsUser, path.getAbsolutePath(), op, destination, owner, group, permission, overwrite, bufferSize, replication, blockSize, modificationTime, accessTime, renameOptions, createParent, - delegationTokenArgument,aclPermission); + delegationTokenArgument, aclPermission, xattrName, xattrValue, + xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes); } finally { - REMOTE_ADDRESS.set(null); + reset(); } } }); @@ -428,7 +498,13 @@ public class NamenodeWebHdfsMethods { final RenameOptionSetParam renameOptions, final CreateParentParam createParent, final TokenArgumentParam delegationTokenArgument, - final AclPermissionParam aclPermission + final AclPermissionParam aclPermission, + final XAttrNameParam xattrName, + final XAttrValueParam xattrValue, + final XAttrSetFlagParam xattrSetFlag, + final SnapshotNameParam snapshotName, + final OldSnapshotNameParam oldSnapshotName, + final ExcludeDatanodesParam exclDatanodes ) throws IOException, URISyntaxException { final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF); @@ -438,9 +514,10 @@ public class NamenodeWebHdfsMethods { switch(op.getValue()) { case CREATE: { - final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), -1L, blockSize.getValue(conf), - permission, overwrite, bufferSize, replication, blockSize); + final URI uri = redirectURI(namenode, ugi, delegation, username, + doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf), + exclDatanodes.getValue(), permission, overwrite, bufferSize, + replication, blockSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case MKDIRS: @@ -528,6 +605,28 @@ public class NamenodeWebHdfsMethods { np.setAcl(fullpath, aclPermission.getAclPermission(true)); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); } + case SETXATTR: { + np.setXAttr( + fullpath, + XAttrHelper.buildXAttr(xattrName.getXAttrName(), + xattrValue.getXAttrValue()), xattrSetFlag.getFlag()); + return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); + } + case REMOVEXATTR: { + np.removeXAttr(fullpath, XAttrHelper.buildXAttr(xattrName.getXAttrName())); + return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); + } + case CREATESNAPSHOT: { + String snapshotPath = np.createSnapshot(fullpath, snapshotName.getValue()); + final String js = JsonUtil.toJsonString( + org.apache.hadoop.fs.Path.class.getSimpleName(), snapshotPath); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + } + case RENAMESNAPSHOT: { + np.renameSnapshot(fullpath, oldSnapshotName.getValue(), + snapshotName.getValue()); + return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); + } default: throw new UnsupportedOperationException(op + " is not supported"); } @@ -551,9 +650,12 @@ public class NamenodeWebHdfsMethods { @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT) final ConcatSourcesParam concatSrcs, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) - final BufferSizeParam bufferSize + final BufferSizeParam bufferSize, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes ) throws IOException, InterruptedException { - return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize); + return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, + bufferSize, excludeDatanodes); } /** Handle HTTP POST request. */ @@ -575,20 +677,23 @@ public class NamenodeWebHdfsMethods { @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT) final ConcatSourcesParam concatSrcs, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) - final BufferSizeParam bufferSize + final BufferSizeParam bufferSize, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes ) throws IOException, InterruptedException { - init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize); + init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize, + excludeDatanodes); return ugi.doAs(new PrivilegedExceptionAction<Response>() { @Override public Response run() throws IOException, URISyntaxException { - REMOTE_ADDRESS.set(request.getRemoteAddr()); try { return post(ugi, delegation, username, doAsUser, - path.getAbsolutePath(), op, concatSrcs, bufferSize); + path.getAbsolutePath(), op, concatSrcs, bufferSize, + excludeDatanodes); } finally { - REMOTE_ADDRESS.set(null); + reset(); } } }); @@ -602,15 +707,17 @@ public class NamenodeWebHdfsMethods { final String fullpath, final PostOpParam op, final ConcatSourcesParam concatSrcs, - final BufferSizeParam bufferSize + final BufferSizeParam bufferSize, + final ExcludeDatanodesParam excludeDatanodes ) throws IOException, URISyntaxException { final NameNode namenode = (NameNode)context.getAttribute("name.node"); switch(op.getValue()) { case APPEND: { - final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), -1L, -1L, bufferSize); + final URI uri = redirectURI(namenode, ugi, delegation, username, + doAsUser, fullpath, op.getValue(), -1L, -1L, + excludeDatanodes.getValue(), bufferSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case CONCAT: @@ -644,10 +751,18 @@ public class NamenodeWebHdfsMethods { @QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT) final RenewerParam renewer, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) - final BufferSizeParam bufferSize + final BufferSizeParam bufferSize, + @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) + final List<XAttrNameParam> xattrNames, + @QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT) + final XAttrEncodingParam xattrEncoding, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes, + @QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT) + final FsActionParam fsAction ) throws IOException, InterruptedException { - return get(ugi, delegation, username, doAsUser, ROOT, op, - offset, length, renewer, bufferSize); + return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length, + renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes, fsAction); } /** Handle HTTP GET request. */ @@ -672,21 +787,29 @@ public class NamenodeWebHdfsMethods { @QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT) final RenewerParam renewer, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) - final BufferSizeParam bufferSize + final BufferSizeParam bufferSize, + @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) + final List<XAttrNameParam> xattrNames, + @QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT) + final XAttrEncodingParam xattrEncoding, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes, + @QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT) + final FsActionParam fsAction ) throws IOException, InterruptedException { - init(ugi, delegation, username, doAsUser, path, op, - offset, length, renewer, bufferSize); + init(ugi, delegation, username, doAsUser, path, op, offset, length, + renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction); return ugi.doAs(new PrivilegedExceptionAction<Response>() { @Override public Response run() throws IOException, URISyntaxException { - REMOTE_ADDRESS.set(request.getRemoteAddr()); try { return get(ugi, delegation, username, doAsUser, - path.getAbsolutePath(), op, offset, length, renewer, bufferSize); + path.getAbsolutePath(), op, offset, length, renewer, bufferSize, + xattrNames, xattrEncoding, excludeDatanodes, fsAction); } finally { - REMOTE_ADDRESS.set(null); + reset(); } } }); @@ -702,7 +825,11 @@ public class NamenodeWebHdfsMethods { final OffsetParam offset, final LengthParam length, final RenewerParam renewer, - final BufferSizeParam bufferSize + final BufferSizeParam bufferSize, + final List<XAttrNameParam> xattrNames, + final XAttrEncodingParam xattrEncoding, + final ExcludeDatanodesParam excludeDatanodes, + final FsActionParam fsAction ) throws IOException, URISyntaxException { final NameNode namenode = (NameNode)context.getAttribute("name.node"); final NamenodeProtocols np = getRPCServer(namenode); @@ -710,8 +837,9 @@ public class NamenodeWebHdfsMethods { switch(op.getValue()) { case OPEN: { - final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize); + final URI uri = redirectURI(namenode, ugi, delegation, username, + doAsUser, fullpath, op.getValue(), offset.getValue(), -1L, + excludeDatanodes.getValue(), offset, length, bufferSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case GET_BLOCK_LOCATIONS: @@ -747,7 +875,7 @@ public class NamenodeWebHdfsMethods { case GETFILECHECKSUM: { final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), -1L, -1L); + fullpath, op.getValue(), -1L, -1L, null); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case GETDELEGATIONTOKEN: @@ -777,6 +905,31 @@ public class NamenodeWebHdfsMethods { final String js = JsonUtil.toJsonString(status); return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); } + case GETXATTRS: { + List<String> names = null; + if (xattrNames != null) { + names = Lists.newArrayListWithCapacity(xattrNames.size()); + for (XAttrNameParam xattrName : xattrNames) { + if (xattrName.getXAttrName() != null) { + names.add(xattrName.getXAttrName()); + } + } + } + List<XAttr> xAttrs = np.getXAttrs(fullpath, (names != null && + !names.isEmpty()) ? XAttrHelper.buildXAttrs(names) : null); + final String js = JsonUtil.toJsonString(xAttrs, + xattrEncoding.getEncoding()); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + } + case LISTXATTRS: { + final List<XAttr> xAttrs = np.listXAttrs(fullpath); + final String js = JsonUtil.toJsonString(xAttrs); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + } + case CHECKACCESS: { + np.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue())); + return Response.ok().build(); + } default: throw new UnsupportedOperationException(op + " is not supported"); } @@ -860,9 +1013,12 @@ public class NamenodeWebHdfsMethods { @QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT) final DeleteOpParam op, @QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT) - final RecursiveParam recursive + final RecursiveParam recursive, + @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT) + final SnapshotNameParam snapshotName ) throws IOException, InterruptedException { - return delete(ugi, delegation, username, doAsUser, ROOT, op, recursive); + return delete(ugi, delegation, username, doAsUser, ROOT, op, recursive, + snapshotName); } /** Handle HTTP DELETE request. */ @@ -881,20 +1037,21 @@ public class NamenodeWebHdfsMethods { @QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT) final DeleteOpParam op, @QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT) - final RecursiveParam recursive + final RecursiveParam recursive, + @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT) + final SnapshotNameParam snapshotName ) throws IOException, InterruptedException { - init(ugi, delegation, username, doAsUser, path, op, recursive); + init(ugi, delegation, username, doAsUser, path, op, recursive, snapshotName); return ugi.doAs(new PrivilegedExceptionAction<Response>() { @Override public Response run() throws IOException { - REMOTE_ADDRESS.set(request.getRemoteAddr()); try { return delete(ugi, delegation, username, doAsUser, - path.getAbsolutePath(), op, recursive); + path.getAbsolutePath(), op, recursive, snapshotName); } finally { - REMOTE_ADDRESS.set(null); + reset(); } } }); @@ -907,17 +1064,22 @@ public class NamenodeWebHdfsMethods { final DoAsParam doAsUser, final String fullpath, final DeleteOpParam op, - final RecursiveParam recursive + final RecursiveParam recursive, + final SnapshotNameParam snapshotName ) throws IOException { final NameNode namenode = (NameNode)context.getAttribute("name.node"); + final NamenodeProtocols np = getRPCServer(namenode); switch(op.getValue()) { - case DELETE: - { - final boolean b = getRPCServer(namenode).delete(fullpath, recursive.getValue()); + case DELETE: { + final boolean b = np.delete(fullpath, recursive.getValue()); final String js = JsonUtil.toJsonString("boolean", b); return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); } + case DELETESNAPSHOT: { + np.deleteSnapshot(fullpath, snapshotName.getValue()); + return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); + } default: throw new UnsupportedOperationException(op + " is not supported"); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java Tue Aug 19 23:49:39 2014 @@ -21,6 +21,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; @@ -50,6 +51,7 @@ public class BlockCommand extends Datano final String poolId; final Block[] blocks; final DatanodeInfo[][] targets; + final StorageType[][] targetStorageTypes; final String[][] targetStorageIDs; /** @@ -62,17 +64,20 @@ public class BlockCommand extends Datano this.poolId = poolId; blocks = new Block[blocktargetlist.size()]; targets = new DatanodeInfo[blocks.length][]; + targetStorageTypes = new StorageType[blocks.length][]; targetStorageIDs = new String[blocks.length][]; for(int i = 0; i < blocks.length; i++) { BlockTargetPair p = blocktargetlist.get(i); blocks[i] = p.block; targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets); + targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets); targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets); } } private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {}; + private static final StorageType[][] EMPTY_TARGET_STORAGE_TYPES = {}; private static final String[][] EMPTY_TARGET_STORAGEIDS = {}; /** @@ -81,7 +86,7 @@ public class BlockCommand extends Datano */ public BlockCommand(int action, String poolId, Block blocks[]) { this(action, poolId, blocks, EMPTY_TARGET_DATANODES, - EMPTY_TARGET_STORAGEIDS); + EMPTY_TARGET_STORAGE_TYPES, EMPTY_TARGET_STORAGEIDS); } /** @@ -89,11 +94,13 @@ public class BlockCommand extends Datano * @param blocks blocks related to the action */ public BlockCommand(int action, String poolId, Block[] blocks, - DatanodeInfo[][] targets, String[][] targetStorageIDs) { + DatanodeInfo[][] targets, StorageType[][] targetStorageTypes, + String[][] targetStorageIDs) { super(action); this.poolId = poolId; this.blocks = blocks; this.targets = targets; + this.targetStorageTypes = targetStorageTypes; this.targetStorageIDs = targetStorageIDs; } @@ -109,6 +116,10 @@ public class BlockCommand extends Datano return targets; } + public StorageType[][] getTargetStorageTypes() { + return targetStorageTypes; + } + public String[][] getTargetStorageIDs() { return targetStorageIDs; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java Tue Aug 19 23:49:39 2014 @@ -32,7 +32,6 @@ public class BlockIdCommand extends Data /** * Create BlockCommand for the given action - * @param blocks blocks related to the action */ public BlockIdCommand(int action, String poolId, long[] blockIds) { super(action); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java Tue Aug 19 23:49:39 2014 @@ -17,10 +17,9 @@ */ package org.apache.hadoop.hdfs.server.protocol; -import java.util.Arrays; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; /** @@ -39,12 +38,15 @@ public class BlocksWithLocations { final Block block; final String[] datanodeUuids; final String[] storageIDs; + final StorageType[] storageTypes; /** constructor */ - public BlockWithLocations(Block block, String[] datanodeUuids, String[] storageIDs) { + public BlockWithLocations(Block block, String[] datanodeUuids, + String[] storageIDs, StorageType[] storageTypes) { this.block = block; this.datanodeUuids = datanodeUuids; this.storageIDs = storageIDs; + this.storageTypes = storageTypes; } /** get the block */ @@ -61,7 +63,12 @@ public class BlocksWithLocations { public String[] getStorageIDs() { return storageIDs; } - + + /** @return the storage types */ + public StorageType[] getStorageTypes() { + return storageTypes; + } + @Override public String toString() { final StringBuilder b = new StringBuilder(); @@ -70,12 +77,18 @@ public class BlocksWithLocations { return b.append("[]").toString(); } - b.append(storageIDs[0]).append('@').append(datanodeUuids[0]); + appendString(0, b.append("[")); for(int i = 1; i < datanodeUuids.length; i++) { - b.append(", ").append(storageIDs[i]).append("@").append(datanodeUuids[i]); + appendString(i, b.append(",")); } return b.append("]").toString(); } + + private StringBuilder appendString(int i, StringBuilder b) { + return b.append("[").append(storageTypes[i]).append("]") + .append(storageIDs[i]) + .append("@").append(datanodeUuids[i]); + } } private final BlockWithLocations[] blocks; Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Tue Aug 19 23:49:39 2014 @@ -119,9 +119,9 @@ public interface DatanodeProtocol { * and should be deleted. This function is meant to upload *all* * the locally-stored blocks. It's invoked upon startup and then * infrequently afterwards. - * @param registration - * @param poolId - the block pool ID for the blocks - * @param reports - report of blocks per storage + * @param registration datanode registration + * @param poolId the block pool ID for the blocks + * @param reports report of blocks per storage * Each finalized block is represented as 3 longs. Each under- * construction replica is represented as 4 longs. * This is done instead of Block[] to reduce memory used by block reports. Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java Tue Aug 19 23:49:39 2014 @@ -48,8 +48,6 @@ public class DatanodeStorage { /** * Create a storage with {@link State#NORMAL} and {@link StorageType#DEFAULT}. - * - * @param storageID */ public DatanodeStorage(String storageID) { this(storageID, State.NORMAL, StorageType.DEFAULT); @@ -84,6 +82,11 @@ public class DatanodeStorage { } @Override + public String toString() { + return "DatanodeStorage["+ storageID + "," + storageType + "," + state +"]"; + } + + @Override public boolean equals(Object other){ if (other == this) { return true; Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java Tue Aug 19 23:49:39 2014 @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.C import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.GenericRefreshProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol; /** The full set of RPC methods implemented by the Namenode. */ @@ -35,6 +36,7 @@ public interface NamenodeProtocols RefreshAuthorizationPolicyProtocol, RefreshUserMappingsProtocol, RefreshCallQueueProtocol, + GenericRefreshProtocol, GetUserMappingsProtocol, HAServiceProtocol { } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java Tue Aug 19 23:49:39 2014 @@ -22,6 +22,9 @@ import org.apache.hadoop.classification. /** * A BlockCommand is an instruction to a datanode to register with the namenode. + * This command can't be combined with other commands in the same response. + * This is because after the datanode processes RegisterCommand, it will skip + * the rest of the DatanodeCommands in the same HeartbeatResponse. */ @InterfaceAudience.Private @InterfaceStability.Evolving Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java Tue Aug 19 23:49:39 2014 @@ -39,7 +39,7 @@ public abstract class ServerCommand { * * @see DatanodeProtocol * @see NamenodeProtocol - * @param action + * @param action protocol specific action */ public ServerCommand(int action) { this.action = action; Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java Tue Aug 19 23:49:39 2014 @@ -32,11 +32,16 @@ import com.google.common.base.Preconditi * DfsClientShm is a subclass of ShortCircuitShm which is used by the * DfsClient. * When the UNIX domain socket associated with this shared memory segment - * closes unexpectedly, we mark the slots inside this segment as stale. - * ShortCircuitReplica objects that contain stale slots are themselves stale, + * closes unexpectedly, we mark the slots inside this segment as disconnected. + * ShortCircuitReplica objects that contain disconnected slots are stale, * and will not be used to service new reads or mmap operations. * However, in-progress read or mmap operations will continue to proceed. * Once the last slot is deallocated, the segment can be safely munmapped. + * + * Slots may also become stale because the associated replica has been deleted + * on the DataNode. In this case, the DataNode will clear the 'valid' bit. + * The client will then see these slots as stale (see + * #{ShortCircuitReplica#isStale}). */ public class DfsClientShm extends ShortCircuitShm implements DomainSocketWatcher.Handler { @@ -58,7 +63,7 @@ public class DfsClientShm extends ShortC * * {@link DfsClientShm#handle} sets this to true. */ - private boolean stale = false; + private boolean disconnected = false; DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager, DomainPeer peer) throws IOException { @@ -76,14 +81,14 @@ public class DfsClientShm extends ShortC } /** - * Determine if the shared memory segment is stale. + * Determine if the shared memory segment is disconnected from the DataNode. * * This must be called with the DfsClientShmManager lock held. * * @return True if the shared memory segment is stale. */ - public synchronized boolean isStale() { - return stale; + public synchronized boolean isDisconnected() { + return disconnected; } /** @@ -97,8 +102,8 @@ public class DfsClientShm extends ShortC public boolean handle(DomainSocket sock) { manager.unregisterShm(getShmId()); synchronized (this) { - Preconditions.checkState(!stale); - stale = true; + Preconditions.checkState(!disconnected); + disconnected = true; boolean hadSlots = false; for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) { Slot slot = iter.next(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java Tue Aug 19 23:49:39 2014 @@ -271,12 +271,12 @@ public class DfsClientShmManager impleme loading = false; finishedLoading.signalAll(); } - if (shm.isStale()) { + if (shm.isDisconnected()) { // If the peer closed immediately after the shared memory segment // was created, the DomainSocketWatcher callback might already have - // fired and marked the shm as stale. In this case, we obviously - // don't want to add the SharedMemorySegment to our list of valid - // not-full segments. + // fired and marked the shm as disconnected. In this case, we + // obviously don't want to add the SharedMemorySegment to our list + // of valid not-full segments. if (LOG.isDebugEnabled()) { LOG.debug(this + ": the UNIX domain socket associated with " + "this short-circuit memory closed before we could make " + @@ -299,7 +299,7 @@ public class DfsClientShmManager impleme void freeSlot(Slot slot) { DfsClientShm shm = (DfsClientShm)slot.getShm(); shm.unregisterSlot(slot.getSlotIdx()); - if (shm.isStale()) { + if (shm.isDisconnected()) { // Stale shared memory segments should not be tracked here. Preconditions.checkState(!full.containsKey(shm.getShmId())); Preconditions.checkState(!notFull.containsKey(shm.getShmId())); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java Tue Aug 19 23:49:39 2014 @@ -111,7 +111,7 @@ public class ShortCircuitCache implement Long evictionTimeNs = Long.valueOf(0); while (true) { Entry<Long, ShortCircuitReplica> entry = - evictableMmapped.ceilingEntry(evictionTimeNs); + evictable.ceilingEntry(evictionTimeNs); if (entry == null) break; evictionTimeNs = entry.getKey(); long evictionTimeMs = @@ -384,10 +384,6 @@ public class ShortCircuitCache implement this.shmManager = shmManager; } - public long getMmapRetryTimeoutMs() { - return mmapRetryTimeoutMs; - } - public long getStaleThresholdMs() { return staleThresholdMs; } @@ -437,11 +433,22 @@ public class ShortCircuitCache implement void unref(ShortCircuitReplica replica) { lock.lock(); try { - // If the replica is stale, but we haven't purged it yet, let's do that. - // It would be a shame to evict a non-stale replica so that we could put - // a stale one into the cache. - if ((!replica.purged) && replica.isStale()) { - purge(replica); + // If the replica is stale or unusable, but we haven't purged it yet, + // let's do that. It would be a shame to evict a non-stale replica so + // that we could put a stale or unusable one into the cache. + if (!replica.purged) { + String purgeReason = null; + if (!replica.getDataStream().getChannel().isOpen()) { + purgeReason = "purging replica because its data channel is closed."; + } else if (!replica.getMetaStream().getChannel().isOpen()) { + purgeReason = "purging replica because its meta channel is closed."; + } else if (replica.isStale()) { + purgeReason = "purging replica because it is stale."; + } + if (purgeReason != null) { + LOG.debug(this + ": " + purgeReason); + purge(replica); + } } String addedString = ""; boolean shouldTrimEvictionMaps = false; @@ -836,7 +843,7 @@ public class ShortCircuitCache implement } else if (replica.mmapData instanceof Long) { long lastAttemptTimeMs = (Long)replica.mmapData; long delta = Time.monotonicNow() - lastAttemptTimeMs; - if (delta < staleThresholdMs) { + if (delta < mmapRetryTimeoutMs) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": can't create client mmap for " + replica + " because we failed to " + Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java Tue Aug 19 23:49:39 2014 @@ -306,6 +306,13 @@ public class ShortCircuitShm { (slotAddress - baseAddress) / BYTES_PER_SLOT); } + /** + * Clear the slot. + */ + void clear() { + unsafe.putLongVolatile(null, this.slotAddress, 0); + } + private boolean isSet(long flag) { long prev = unsafe.getLongVolatile(null, this.slotAddress); return (prev & flag) != 0; @@ -535,6 +542,7 @@ public class ShortCircuitShm { } allocatedSlots.set(idx, true); Slot slot = new Slot(calculateSlotAddress(idx), blockId); + slot.clear(); slot.makeValid(); slots[idx] = slot; if (LOG.isTraceEnabled()) { @@ -583,7 +591,7 @@ public class ShortCircuitShm { Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId); if (!slot.isValid()) { throw new InvalidRequestException(this + ": slot " + slotIdx + - " has not been allocated."); + " is not marked as valid."); } slots[slotIdx] = slot; allocatedSlots.set(slotIdx, true); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java Tue Aug 19 23:49:39 2014 @@ -40,7 +40,8 @@ import org.apache.hadoop.hdfs.protocol.C import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolStats; -import org.apache.hadoop.hdfs.tools.TableListing.Justification; +import org.apache.hadoop.tools.TableListing; +import org.apache.hadoop.tools.TableListing.Justification; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; @@ -503,19 +504,21 @@ public class CacheAdmin extends Configur @Override public String getShortUsage() { - return "[" + getName() + " [-stats] [-path <path>] [-pool <pool>]]\n"; + return "[" + getName() + + " [-stats] [-path <path>] [-pool <pool>] [-id <id>]\n"; } @Override public String getLongUsage() { TableListing listing = getOptionDescriptionListing(); + listing.addRow("-stats", "List path-based cache directive statistics."); listing.addRow("<path>", "List only " + "cache directives with this path. " + "Note that if there is a cache directive for <path> " + "in a cache pool that we don't have read access for, it " + "will not be listed."); listing.addRow("<pool>", "List only path cache directives in that pool."); - listing.addRow("-stats", "List path-based cache directive statistics."); + listing.addRow("<id>", "List the cache directive with this id."); return getShortUsage() + "\n" + "List cache directives.\n\n" + listing.toString(); @@ -534,6 +537,10 @@ public class CacheAdmin extends Configur builder.setPool(poolFilter); } boolean printStats = StringUtils.popOption("-stats", args); + String idFilter = StringUtils.popOptionWithArgument("-id", args); + if (idFilter != null) { + builder.setId(Long.parseLong(idFilter)); + } if (!args.isEmpty()) { System.err.println("Can't understand argument: " + args.get(0)); return 1;