Author: jing9 Date: Tue Aug 12 19:32:09 2014 New Revision: 1617568 URL: http://svn.apache.org/r1617568 Log: HDFS-6835. Archival Storage: Add a new API to set storage policy. Contributed by Jing Zhao.
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java Tue Aug 12 19:32:09 2014 @@ -68,6 +68,17 @@ public class BlockStoragePolicy { public BlockStoragePolicy getDefaultPolicy() { return getPolicy(defaultPolicyID); } + + public BlockStoragePolicy getPolicy(String policyName) { + if (policies != null) { + for (BlockStoragePolicy policy : policies) { + if (policy != null && policy.name.equals(policyName)) { + return policy; + } + } + } + return null; + } } /** A 4-bit policy ID */ @@ -172,6 +183,10 @@ public class BlockStoragePolicy { + ", replicationFallbacks=" + Arrays.asList(replicationFallbacks); } + public byte getId() { + return id; + } + private static StorageType getFallback(EnumSet<StorageType> unavailables, StorageType[] fallbacks) { for(StorageType fb : fallbacks) { Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Aug 12 19:32:09 2014 @@ -1646,6 +1646,25 @@ public class DFSClient implements java.i } /** + * Set storage policy for an existing file + * @param src file name + * @param policyName name of the storage policy + */ + public void setStoragePolicy(String src, String policyName) + throws IOException { + try { + namenode.setStoragePolicy(src, policyName); + } catch (RemoteException e) { + throw e.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + SafeModeException.class, + NSQuotaExceededException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } + } + + /** * Rename file or directory. * @see ClientProtocol#rename(String, String) * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead. Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Tue Aug 12 19:32:09 2014 @@ -461,7 +461,33 @@ public class DistributedFileSystem exten } }.resolve(this, absF); } - + + public void setStoragePolicy(final Path src, final String policyName) + throws IOException { + statistics.incrementWriteOps(1); + Path absF = fixRelativePart(src); + new FileSystemLinkResolver<Void>() { + @Override + public Void doCall(final Path p) + throws IOException, UnresolvedLinkException { + dfs.setStoragePolicy(getPathName(p), policyName); + return null; + } + @Override + public Void next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + ((DistributedFileSystem) fs).setStoragePolicy(p, policyName); + return null; + } else { + throw new UnsupportedOperationException( + "Cannot perform setStoragePolicy on a non-DistributedFileSystem: " + + src + " -> " + p); + } + } + }.resolve(this, absF); + } + /** * Move blocks from srcs to trg and delete srcs afterwards. * The file block sizes must be the same. Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Aug 12 19:32:09 2014 @@ -255,6 +255,20 @@ public interface ClientProtocol { SnapshotAccessControlException, IOException; /** + * Set the storage policy for an existing file + * @param src Path of an existing file. + * @param policyName The name of the storage policy + * @throws SnapshotAccessControlException If access is denied + * @throws UnresolvedLinkException if <code>src</code> contains a symlink + * @throws FileNotFoundException If file/dir <code>src</code> is not found + * @throws QuotaExceededException If changes violate the quota restriction + */ + @Idempotent + public void setStoragePolicy(String src, String policyName) + throws SnapshotAccessControlException, UnresolvedLinkException, + FileNotFoundException, QuotaExceededException, IOException; + + /** * Set permissions for an existing file/directory. * * @throws AccessControlException If access is denied Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Tue Aug 12 19:32:09 2014 @@ -168,6 +168,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; @@ -225,6 +227,8 @@ public class ClientNamenodeProtocolServe static final GetSnapshottableDirListingResponseProto NULL_GET_SNAPSHOTTABLE_DIR_LISTING_RESPONSE = GetSnapshottableDirListingResponseProto.newBuilder().build(); + static final SetStoragePolicyResponseProto VOID_SET_STORAGE_POLICY_RESPONSE = + SetStoragePolicyResponseProto.newBuilder().build(); private static final CreateResponseProto VOID_CREATE_RESPONSE = CreateResponseProto.newBuilder().build(); @@ -1354,4 +1358,16 @@ public class ClientNamenodeProtocolServe } return VOID_CHECKACCESS_RESPONSE; } + + @Override + public SetStoragePolicyResponseProto setStoragePolicy( + RpcController controller, SetStoragePolicyRequestProto request) + throws ServiceException { + try { + server.setStoragePolicy(request.getSrc(), request.getPolicyName()); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_SET_STORAGE_POLICY_RESPONSE; + } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Tue Aug 12 19:32:09 2014 @@ -60,7 +60,9 @@ import org.apache.hadoop.hdfs.protocol.H import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto; @@ -146,6 +148,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto; @@ -1359,4 +1362,17 @@ public class ClientNamenodeProtocolTrans throw ProtobufHelper.getRemoteException(e); } } + + @Override + public void setStoragePolicy(String src, String policyName) + throws SnapshotAccessControlException, UnresolvedLinkException, + FileNotFoundException, QuotaExceededException, IOException { + SetStoragePolicyRequestProto req = SetStoragePolicyRequestProto + .newBuilder().setSrc(src).setPolicyName(policyName).build(); + try { + rpcProxy.setStoragePolicy(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Aug 12 19:32:09 2014 @@ -395,7 +395,11 @@ public class BlockManager { lifetimeMin*60*1000L, 0, null, encryptionAlgorithm); } } - + + public BlockStoragePolicy getStoragePolicy(final String policyName) { + return storagePolicySuite.getPolicy(policyName); + } + public void setBlockPoolId(String blockPoolId) { if (isBlockTokenEnabled()) { blockTokenSecretManager.setBlockPoolId(blockPoolId); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Aug 12 19:32:09 2014 @@ -946,6 +946,28 @@ public class FSDirectory implements Clos return file.getBlocks(); } + /** Set block storage policy for a file */ + void setStoragePolicy(String src, byte policyId) + throws SnapshotAccessControlException, UnresolvedLinkException, + FileNotFoundException, QuotaExceededException { + writeLock(); + try { + unprotectedSetStoragePolicy(src, policyId); + } finally { + writeUnlock(); + } + } + + void unprotectedSetStoragePolicy(String src, byte policyId) + throws SnapshotAccessControlException, UnresolvedLinkException, + FileNotFoundException, QuotaExceededException { + assert hasWriteLock(); + final INodesInPath iip = getINodesInPath4Write(src, true); + // TODO: currently we only support setting storage policy on a file + final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src); + inode.setStoragePolicyID(policyId, iip.getLatestSnapshotId()); + } + /** * @param path the file path * @return the block size of the file. Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Aug 12 19:32:09 2014 @@ -82,6 +82,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; @@ -812,7 +813,16 @@ public class FSEditLog implements LogsPu .setReplication(replication); logEdit(op); } - + + /** + * Add set storage policy id record to edit log + */ + void logSetStoragePolicy(String src, byte policyId) { + SetStoragePolicyOp op = SetStoragePolicyOp.getInstance(cache.get()) + .setPath(src).setPolicyId(policyId); + logEdit(op); + } + /** Add set namespace quota record to edit log * * @param src the string representation of the path to a directory Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Aug 12 19:32:09 2014 @@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp; @@ -827,6 +828,13 @@ public class FSEditLogLoader { } break; } + case OP_SET_STORAGE_POLICY: { + SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op; + fsDir.unprotectedSetStoragePolicy( + renameReservedPathsOnUpgrade(setStoragePolicyOp.path, logVersion), + setStoragePolicyOp.policyId); + break; + } default: throw new IOException("Invalid operation read " + op.opCode); } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Aug 12 19:32:09 2014 @@ -61,6 +61,7 @@ import static org.apache.hadoop.hdfs.ser import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_STORAGE_POLICY; import java.io.DataInput; import java.io.DataInputStream; @@ -193,6 +194,7 @@ public abstract class FSEditLogOp { OP_ROLLING_UPGRADE_FINALIZE, "finalize")); inst.put(OP_SET_XATTR, new SetXAttrOp()); inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp()); + inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp()); } public FSEditLogOp get(FSEditLogOpCodes opcode) { @@ -3780,6 +3782,71 @@ public abstract class FSEditLogOp { } } + /** {@literal @Idempotent} for {@link ClientProtocol#setStoragePolicy} */ + static class SetStoragePolicyOp extends FSEditLogOp { + String path; + byte policyId; + + private SetStoragePolicyOp() { + super(OP_SET_STORAGE_POLICY); + } + + static SetStoragePolicyOp getInstance(OpInstanceCache cache) { + return (SetStoragePolicyOp) cache.get(OP_SET_STORAGE_POLICY); + } + + SetStoragePolicyOp setPath(String path) { + this.path = path; + return this; + } + + SetStoragePolicyOp setPolicyId(byte policyId) { + this.policyId = policyId; + return this; + } + + @Override + public void writeFields(DataOutputStream out) throws IOException { + FSImageSerialization.writeString(path, out); + out.writeByte(policyId); + } + + @Override + void readFields(DataInputStream in, int logVersion) + throws IOException { + this.path = FSImageSerialization.readString(in); + this.policyId = in.readByte(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("SetStoragePolicyOp [path="); + builder.append(path); + builder.append(", policyId="); + builder.append(policyId); + builder.append(", opCode="); + builder.append(opCode); + builder.append(", txid="); + builder.append(txid); + builder.append("]"); + return builder.toString(); + } + + @Override + protected void toXml(ContentHandler contentHandler) throws SAXException { + XMLUtils.addSaxString(contentHandler, "PATH", path); + XMLUtils.addSaxString(contentHandler, "POLICYID", + Byte.valueOf(policyId).toString()); + } + + @Override + void fromXml(Stanza st) throws InvalidXmlException { + this.path = st.getValue("PATH"); + this.policyId = Byte.valueOf(st.getValue("POLICYID")); + } + } + /** * Class for writing editlog ops */ Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Tue Aug 12 19:32:09 2014 @@ -72,6 +72,7 @@ public enum FSEditLogOpCodes { OP_ROLLING_UPGRADE_FINALIZE ((byte) 42), OP_SET_XATTR ((byte) 43), OP_REMOVE_XATTR ((byte) 44), + OP_SET_STORAGE_POLICY ((byte) 45), // Note that the current range of the valid OP code is 0~127 OP_INVALID ((byte) -1); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Aug 12 19:32:09 2014 @@ -154,6 +154,7 @@ import org.apache.hadoop.fs.permission.F import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; @@ -2219,6 +2220,52 @@ public class FSNamesystem implements Nam return isFile; } + /** + * Set the storage policy for an existing file. + * + * @param src file name + * @param policyName storage policy name + */ + void setStoragePolicy(String src, final String policyName) + throws IOException { + try { + setStoragePolicyInt(src, policyName); + } catch (AccessControlException e) { + logAuditEvent(false, "setStoragePolicy", src); + throw e; + } + } + + private void setStoragePolicyInt(String src, final String policyName) + throws IOException { + checkSuperuserPrivilege(); + checkOperation(OperationCategory.WRITE); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + waitForLoadingFSImage(); + HdfsFileStatus fileStat; + writeLock(); + try { + checkOperation(OperationCategory.WRITE); + checkNameNodeSafeMode("Cannot set storage policy for " + src); + src = FSDirectory.resolvePath(src, pathComponents, dir); + + // get the corresponding policy and make sure the policy name is valid + BlockStoragePolicy policy = blockManager.getStoragePolicy(policyName); + if (policy == null) { + throw new HadoopIllegalArgumentException( + "Cannot find a block policy with the name " + policyName); + } + dir.setStoragePolicy(src, policy.getId()); + getEditLog().logSetStoragePolicy(src, policy.getId()); + fileStat = getAuditFileInfo(src, false); + } finally { + writeUnlock(); + } + + getEditLog().logSync(); + logAuditEvent(true, "setStoragePolicy", src, null, fileStat); + } + long getPreferredBlockSize(String filename) throws IOException, UnresolvedLinkException { FSPermissionChecker pc = getPermissionChecker(); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Tue Aug 12 19:32:09 2014 @@ -372,6 +372,17 @@ public class INodeFile extends INodeWith return HeaderFormat.getStoragePolicyID(header); } + /** Set the policy id of the file */ + public final void setStoragePolicyID(byte policyId) { + header = HeaderFormat.STORAGE_POLICY_ID.BITS.combine(policyId, header); + } + + public final void setStoragePolicyID(byte policyId, int lastSnapshotId) + throws QuotaExceededException { + recordModification(lastSnapshotId); + setStoragePolicyID(policyId); + } + @Override public long getHeaderLong() { return header; Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Tue Aug 12 19:32:09 2014 @@ -579,7 +579,13 @@ class NameNodeRpcServer implements Namen throws IOException { return namesystem.setReplication(src, replication); } - + + @Override + public void setStoragePolicy(String src, String policyName) + throws IOException { + namesystem.setStoragePolicy(src, policyName); + } + @Override // ClientProtocol public void setPermission(String src, FsPermission permissions) throws IOException { Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Tue Aug 12 19:32:09 2014 @@ -97,6 +97,14 @@ message SetReplicationResponseProto { required bool result = 1; } +message SetStoragePolicyRequestProto { + required string src = 1; + required string policyName = 2; +} + +message SetStoragePolicyResponseProto { // void response +} + message SetPermissionRequestProto { required string src = 1; required FsPermissionProto permission = 2; @@ -671,6 +679,8 @@ service ClientNamenodeProtocol { rpc append(AppendRequestProto) returns(AppendResponseProto); rpc setReplication(SetReplicationRequestProto) returns(SetReplicationResponseProto); + rpc setStoragePolicy(SetStoragePolicyRequestProto) + returns(SetStoragePolicyResponseProto); rpc setPermission(SetPermissionRequestProto) returns(SetPermissionResponseProto); rpc setOwner(SetOwnerRequestProto) returns(SetOwnerResponseProto); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java?rev=1617568&r1=1617567&r2=1617568&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java Tue Aug 12 19:32:09 2014 @@ -17,12 +17,19 @@ */ package org.apache.hadoop.hdfs; +import java.io.FileNotFoundException; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.server.namenode.FSDirectory; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; @@ -30,8 +37,10 @@ import org.junit.Test; public class TestBlockStoragePolicy { public static final BlockStoragePolicy.Suite POLICY_SUITE; public static final BlockStoragePolicy DEFAULT_STORAGE_POLICY; + public static final Configuration conf; + static { - final Configuration conf = new HdfsConfiguration(); + conf = new HdfsConfiguration(); POLICY_SUITE = BlockStoragePolicy.readBlockStorageSuite(conf); DEFAULT_STORAGE_POLICY = POLICY_SUITE.getDefaultPolicy(); } @@ -41,11 +50,15 @@ public class TestBlockStoragePolicy { static final EnumSet<StorageType> disk = EnumSet.of(StorageType.DISK); static final EnumSet<StorageType> both = EnumSet.of(StorageType.DISK, StorageType.ARCHIVE); + static final long FILE_LEN = 1024; + static final short REPLICATION = 3; + + static final byte COLD = (byte) 4; + static final byte WARM = (byte) 8; + static final byte HOT = (byte) 12; + @Test public void testDefaultPolicies() throws Exception { - final byte COLD = (byte)4; - final byte WARM = (byte)8; - final byte HOT = (byte)12; final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>(); expectedPolicyStrings.put(COLD, "BlockStoragePolicy{COLD:4, storageTypes=[ARCHIVE], creationFallbacks=[], replicationFallbacks=[]"); @@ -119,4 +132,81 @@ public class TestBlockStoragePolicy { Assert.assertEquals(diskExpected, policy.getReplicationFallback(disk)); Assert.assertEquals(null, policy.getReplicationFallback(both)); } + + @Test + public void testSetStoragePolicy() throws Exception { + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(REPLICATION).build(); + cluster.waitActive(); + FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); + final DistributedFileSystem fs = cluster.getFileSystem(); + try { + final Path dir = new Path("/testSetStoragePolicy"); + final Path fooFile = new Path(dir, "foo"); + final Path barDir = new Path(dir, "bar"); + final Path barFile1= new Path(barDir, "f1"); + final Path barFile2= new Path(barDir, "f2"); + DFSTestUtil.createFile(fs, fooFile, FILE_LEN, REPLICATION, 0L); + DFSTestUtil.createFile(fs, barFile1, FILE_LEN, REPLICATION, 0L); + DFSTestUtil.createFile(fs, barFile2, FILE_LEN, REPLICATION, 0L); + + final String invalidPolicyName = "INVALID-POLICY"; + try { + fs.setStoragePolicy(fooFile, invalidPolicyName); + Assert.fail("Should throw a HadoopIllegalArgumentException"); + } catch (RemoteException e) { + GenericTestUtils.assertExceptionContains(invalidPolicyName, e); + } + + // check internal status + INodeFile fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile(); + INodeFile barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile(); + INodeFile barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile(); + + final Path invalidPath = new Path("/invalidPath"); + try { + fs.setStoragePolicy(invalidPath, "WARM"); + Assert.fail("Should throw a FileNotFoundException"); + } catch (FileNotFoundException e) { + GenericTestUtils.assertExceptionContains(invalidPath.toString(), e); + } + + fs.setStoragePolicy(fooFile, "COLD"); + fs.setStoragePolicy(barFile1, "WARM"); + fs.setStoragePolicy(barFile2, "WARM"); + // TODO: set storage policy on a directory + + // check internal status + Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID()); + Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID()); + Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID()); + + // restart namenode to make sure the editlog is correct + cluster.restartNameNode(true); + fsdir = cluster.getNamesystem().getFSDirectory(); + fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile(); + Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID()); + barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile(); + Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID()); + barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile(); + Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID()); + + // restart namenode with checkpoint to make sure the fsimage is correct + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + cluster.restartNameNode(true); + fsdir = cluster.getNamesystem().getFSDirectory(); + fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile(); + Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID()); + barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile(); + Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID()); + barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile(); + Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } }