Author: szetszwo
Date: Sat Aug 16 20:58:20 2014
New Revision: 1618416

URL: http://svn.apache.org/r1618416
Log:
HDFS-6847. Support storage policy on directories and include storage policy in 
HdfsFileStatus.  Contributed by Jing Zhao

Modified:
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
    
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
(original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
Sat Aug 16 20:58:20 2014
@@ -20,6 +20,9 @@ HDFS-6584: Archival Storage
     HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage
     types are unavailable.  (szetszwo)
 
+    HDFS-6847. Support storage policy on directories and include storage 
policy 
+    in HdfsFileStatus.  (Jing Zhao via szetszwo)
+
 Trunk (Unreleased)
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
 Sat Aug 16 20:58:20 2014
@@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttr.NameSpace;
 
 /**
  * A block storage policy describes how to select the storage types
@@ -44,9 +46,13 @@ public class BlockStoragePolicy {
       = "dfs.block.storage.policy.creation-fallback.";
   public static final String 
DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX
       = "dfs.block.storage.policy.replication-fallback.";
+  public static final String STORAGE_POLICY_XATTR_NAME = "bsp";
+  /** set the namespace to TRUSTED so that only privilege users can access */
+  public static final NameSpace XAttrNS = NameSpace.TRUSTED;
 
   public static final int ID_BIT_LENGTH = 4;
   public static final int ID_MAX = (1 << ID_BIT_LENGTH) - 1;
+  public static final byte ID_UNSPECIFIED = 0;
 
   /** A block storage policy suite. */
   public static class Suite {
@@ -299,6 +305,20 @@ public class BlockStoragePolicy {
     return new Suite(firstID, policies);
   }
 
+  public static String buildXAttrName() {
+    return XAttrNS.toString().toLowerCase() + "." + STORAGE_POLICY_XATTR_NAME;
+  }
+
+  public static XAttr buildXAttr(byte policyId) {
+    final String name = buildXAttrName();
+    return XAttrHelper.buildXAttr(name, new byte[] { policyId });
+  }
+
+  public static boolean isStoragePolicyXAttr(XAttr xattr) {
+    return xattr != null && xattr.getNameSpace() == BlockStoragePolicy.XAttrNS
+        && 
xattr.getName().equals(BlockStoragePolicy.STORAGE_POLICY_XATTR_NAME);
+  }
+
   private static void throwIllegalArgumentException(String message,
       Configuration conf) {
     throw new IllegalArgumentException(message + " in "

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 Sat Aug 16 20:58:20 2014
@@ -1646,8 +1646,8 @@ public class DFSClient implements java.i
   }
 
   /**
-   * Set storage policy for an existing file
-   * @param src file name
+   * Set storage policy for an existing file/directory
+   * @param src file/directory name
    * @param policyName name of the storage policy
    */
   public void setStoragePolicy(String src, String policyName)

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
 Sat Aug 16 20:58:20 2014
@@ -255,8 +255,8 @@ public interface ClientProtocol {
       SnapshotAccessControlException, IOException;
 
   /**
-   * Set the storage policy for an existing file
-   * @param src Path of an existing file. 
+   * Set the storage policy for a file/directory
+   * @param src Path of an existing file/directory. 
    * @param policyName The name of the storage policy
    * @throws SnapshotAccessControlException If access is denied
    * @throws UnresolvedLinkException if <code>src</code> contains a symlink

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
 Sat Aug 16 20:58:20 2014
@@ -44,9 +44,10 @@ public class HdfsFileStatus {
   private final String owner;
   private final String group;
   private final long fileId;
-  
+
   // Used by dir, not including dot and dotdot. Always zero for a regular file.
   private final int childrenNum;
+  private final byte storagePolicy;
   
   public static final byte[] EMPTY_NAME = new byte[0];
 
@@ -65,9 +66,9 @@ public class HdfsFileStatus {
    * @param fileId the file id
    */
   public HdfsFileStatus(long length, boolean isdir, int block_replication,
-                    long blocksize, long modification_time, long access_time,
-                    FsPermission permission, String owner, String group, 
-                    byte[] symlink, byte[] path, long fileId, int childrenNum) 
{
+      long blocksize, long modification_time, long access_time,
+      FsPermission permission, String owner, String group, byte[] symlink,
+      byte[] path, long fileId, int childrenNum, byte storagePolicy) {
     this.length = length;
     this.isdir = isdir;
     this.block_replication = (short)block_replication;
@@ -85,6 +86,7 @@ public class HdfsFileStatus {
     this.path = path;
     this.fileId = fileId;
     this.childrenNum = childrenNum;
+    this.storagePolicy = storagePolicy;
   }
 
   /**
@@ -242,6 +244,11 @@ public class HdfsFileStatus {
     return childrenNum;
   }
 
+  /** @return the storage policy id */
+  public final byte getStoragePolicy() {
+    return storagePolicy;
+  }
+
   final public FileStatus makeQualified(URI defaultUri, Path path) {
     return new FileStatus(getLen(), isDir(), getReplication(),
         getBlockSize(), getModificationTime(),

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
 Sat Aug 16 20:58:20 2014
@@ -34,7 +34,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 @InterfaceStability.Evolving
 public class HdfsLocatedFileStatus extends HdfsFileStatus {
   private final LocatedBlocks locations;
-  
+
   /**
    * Constructor
    * 
@@ -56,13 +56,13 @@ public class HdfsLocatedFileStatus exten
       int block_replication, long blocksize, long modification_time,
       long access_time, FsPermission permission, String owner, String group,
       byte[] symlink, byte[] path, long fileId, LocatedBlocks locations,
-      int childrenNum) {
+      int childrenNum, byte storagePolicy) {
     super(length, isdir, block_replication, blocksize, modification_time,
         access_time, permission, owner, group, symlink, path, fileId,
-        childrenNum);
+        childrenNum, storagePolicy);
     this.locations = locations;
   }
-       
+
   public LocatedBlocks getBlockLocations() {
     return locations;
   }

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java
 Sat Aug 16 20:58:20 2014
@@ -24,6 +24,7 @@ import java.util.Date;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSUtil;
 
 /**
@@ -61,7 +62,7 @@ public class SnapshottableDirectoryStatu
       int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
     this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
         access_time, permission, owner, group, null, localName, inodeId,
-        childrenNum);
+        childrenNum, BlockStoragePolicy.ID_UNSPECIFIED);
     this.snapshotNumber = snapshotNumber;
     this.snapshotQuota = snapshotQuota;
     this.parentFullPath = parentFullPath;

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
 Sat Aug 16 20:58:20 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -1315,7 +1316,9 @@ public class PBHelper {
         fs.getPath().toByteArray(),
         fs.hasFileId()? fs.getFileId(): INodeId.GRANDFATHER_INODE_ID,
         fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null,
-        fs.hasChildrenNum() ? fs.getChildrenNum() : -1);
+        fs.hasChildrenNum() ? fs.getChildrenNum() : -1,
+        fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
+            : BlockStoragePolicy.ID_UNSPECIFIED);
   }
 
   public static SnapshottableDirectoryStatus convert(
@@ -1361,12 +1364,14 @@ public class PBHelper {
       setGroup(fs.getGroup()).
       setFileId(fs.getFileId()).
       setChildrenNum(fs.getChildrenNum()).
-      setPath(ByteString.copyFrom(fs.getLocalNameInBytes()));
+      setPath(ByteString.copyFrom(fs.getLocalNameInBytes())).
+      setStoragePolicy(fs.getStoragePolicy());
     if (fs.isSymlink())  {
       builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));
     }
     if (fs instanceof HdfsLocatedFileStatus) {
-      LocatedBlocks locations = 
((HdfsLocatedFileStatus)fs).getBlockLocations();
+      final HdfsLocatedFileStatus lfs = (HdfsLocatedFileStatus) fs;
+      LocatedBlocks locations = lfs.getBlockLocations();
       if (locations != null) {
         builder.setLocations(PBHelper.convert(locations));
       }

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 Sat Aug 16 20:58:20 2014
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.permission.A
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -946,10 +947,9 @@ public class FSDirectory implements Clos
     return file.getBlocks();
   }
 
-  /** Set block storage policy for a file */
+  /** Set block storage policy for a directory */
   void setStoragePolicy(String src, byte policyId)
-      throws SnapshotAccessControlException, UnresolvedLinkException,
-      FileNotFoundException, QuotaExceededException {
+      throws IOException {
     writeLock();
     try {
       unprotectedSetStoragePolicy(src, policyId);
@@ -959,13 +959,30 @@ public class FSDirectory implements Clos
   }
 
   void unprotectedSetStoragePolicy(String src, byte policyId)
-      throws SnapshotAccessControlException, UnresolvedLinkException,
-      FileNotFoundException, QuotaExceededException {
+      throws IOException {
     assert hasWriteLock();
     final INodesInPath iip = getINodesInPath4Write(src, true);
-    // TODO: currently we only support setting storage policy on a file
-    final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
-    inode.setStoragePolicyID(policyId, iip.getLatestSnapshotId());
+    final INode inode = iip.getLastINode();
+    if (inode == null) {
+      throw new FileNotFoundException("File/Directory does not exist: " + src);
+    }
+    final int snapshotId = iip.getLatestSnapshotId();
+    if (inode.isFile()) {
+      inode.asFile().setStoragePolicyID(policyId, snapshotId);
+    } else if (inode.isDirectory()) {
+      setDirStoragePolicy(inode.asDirectory(), policyId, snapshotId);  
+    } else {
+      throw new FileNotFoundException(src + " is not a file or directory");
+    }
+  }
+
+  private void setDirStoragePolicy(INodeDirectory inode, byte policyId,
+      int latestSnapshotId) throws IOException {
+    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+    XAttr xAttr = BlockStoragePolicy.buildXAttr(policyId);
+    List<XAttr> newXAttrs = setINodeXAttrs(existingXAttrs, 
Arrays.asList(xAttr),
+        EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
+    XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
   }
 
   /**
@@ -1313,7 +1330,8 @@ public class FSDirectory implements Clos
    * @return a partial listing starting after startAfter
    */
   DirectoryListing getListing(String src, byte[] startAfter,
-      boolean needLocation) throws UnresolvedLinkException, IOException {
+      boolean needLocation, boolean isSuperUser)
+      throws UnresolvedLinkException, IOException {
     String srcs = normalizePath(src);
 
     readLock();
@@ -1321,16 +1339,19 @@ public class FSDirectory implements Clos
       if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
         return getSnapshotsListing(srcs, startAfter);
       }
-      final INodesInPath inodesInPath = getLastINodeInPath(srcs, true);
+      final INodesInPath inodesInPath = getINodesInPath(srcs, true);
       final int snapshot = inodesInPath.getPathSnapshotId();
-      final INode targetNode = inodesInPath.getINode(0);
+      final INode[] inodes = inodesInPath.getINodes();
+      final INode targetNode = inodes[inodes.length - 1];
+      byte parentStoragePolicy = isSuperUser ? getStoragePolicy(inodes,
+          snapshot) : BlockStoragePolicy.ID_UNSPECIFIED;
       if (targetNode == null)
         return null;
       
       if (!targetNode.isDirectory()) {
         return new DirectoryListing(
             new HdfsFileStatus[]{createFileStatus(HdfsFileStatus.EMPTY_NAME,
-                targetNode, needLocation, snapshot)}, 0);
+                targetNode, needLocation, parentStoragePolicy, snapshot)}, 0);
       }
 
       final INodeDirectory dirInode = targetNode.asDirectory();
@@ -1343,8 +1364,10 @@ public class FSDirectory implements Clos
       HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
       for (int i=0; i<numOfListing && locationBudget>0; i++) {
         INode cur = contents.get(startChild+i);
+        byte curPolicy = cur.getStoragePolicyID(snapshot);
         listing[i] = createFileStatus(cur.getLocalNameBytes(), cur,
-            needLocation, snapshot);
+            needLocation, curPolicy != BlockStoragePolicy.ID_UNSPECIFIED ?
+                curPolicy : parentStoragePolicy, snapshot);
         listingCnt++;
         if (needLocation) {
             // Once we  hit lsLimit locations, stop.
@@ -1395,7 +1418,7 @@ public class FSDirectory implements Clos
     for (int i = 0; i < numOfListing; i++) {
       Root sRoot = snapshots.get(i + skipSize).getRoot();
       listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot,
-          Snapshot.CURRENT_STATE_ID);
+          BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID);
     }
     return new DirectoryListing(
         listing, snapshots.size() - skipSize - numOfListing);
@@ -1417,8 +1440,8 @@ public class FSDirectory implements Clos
       }
       final INodesInPath inodesInPath = getLastINodeInPath(srcs, resolveLink);
       final INode i = inodesInPath.getINode(0);
-      return i == null? null: createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
-          inodesInPath.getPathSnapshotId());
+      return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
+          BlockStoragePolicy.ID_UNSPECIFIED, inodesInPath.getPathSnapshotId());
     } finally {
       readUnlock();
     }
@@ -1435,7 +1458,7 @@ public class FSDirectory implements Clos
       throws UnresolvedLinkException {
     if (getINode4DotSnapshot(src) != null) {
       return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
-          HdfsFileStatus.EMPTY_NAME, -1L, 0);
+          HdfsFileStatus.EMPTY_NAME, -1L, 0, 
BlockStoragePolicy.ID_UNSPECIFIED);
     }
     return null;
   }
@@ -2247,18 +2270,19 @@ public class FSDirectory implements Clos
    * @throws IOException if any error occurs
    */
   private HdfsFileStatus createFileStatus(byte[] path, INode node,
-      boolean needLocation, int snapshot) throws IOException {
+      boolean needLocation, byte storagePolicy, int snapshot) throws 
IOException {
     if (needLocation) {
-      return createLocatedFileStatus(path, node, snapshot);
+      return createLocatedFileStatus(path, node, storagePolicy, snapshot);
     } else {
-      return createFileStatus(path, node, snapshot);
+      return createFileStatus(path, node, storagePolicy, snapshot);
     }
   }
+
   /**
    * Create FileStatus by file INode 
    */
-   HdfsFileStatus createFileStatus(byte[] path, INode node,
-       int snapshot) {
+  HdfsFileStatus createFileStatus(byte[] path, INode node, byte storagePolicy,
+      int snapshot) {
      long size = 0;     // length is zero for directories
      short replication = 0;
      long blocksize = 0;
@@ -2270,7 +2294,7 @@ public class FSDirectory implements Clos
      }
      int childrenNum = node.isDirectory() ? 
          node.asDirectory().getChildrenNum(snapshot) : 0;
-         
+
      return new HdfsFileStatus(
         size, 
         node.isDirectory(), 
@@ -2284,14 +2308,24 @@ public class FSDirectory implements Clos
         node.isSymlink() ? node.asSymlink().getSymlink() : null,
         path,
         node.getId(),
-        childrenNum);
+        childrenNum, storagePolicy);
   }
 
+  private byte getStoragePolicy(INode[] inodes, int snapshotId) {
+    for (int i = inodes.length - 1; i >= 0; i--) {
+      byte policy = inodes[i].getStoragePolicyID(snapshotId);
+      if (policy != BlockStoragePolicy.ID_UNSPECIFIED) {
+        return policy;
+      }
+    }
+    return BlockStoragePolicy.ID_UNSPECIFIED;
+  }  
+
   /**
    * Create FileStatus with location info by file INode
    */
   private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path,
-      INode node, int snapshot) throws IOException {
+      INode node, byte storagePolicy, int snapshot) throws IOException {
     assert hasReadLock();
     long size = 0; // length is zero for directories
     short replication = 0;
@@ -2316,7 +2350,7 @@ public class FSDirectory implements Clos
     }
     int childrenNum = node.isDirectory() ? 
         node.asDirectory().getChildrenNum(snapshot) : 0;
-        
+
     HdfsLocatedFileStatus status =
         new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
           blocksize, node.getModificationTime(snapshot),
@@ -2324,7 +2358,7 @@ public class FSDirectory implements Clos
           getPermissionForFileStatus(node, snapshot),
           node.getUserName(snapshot), node.getGroupName(snapshot),
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
-          node.getId(), loc, childrenNum);
+          node.getId(), loc, childrenNum, storagePolicy);
     // Set caching information for the located blocks.
     if (loc != null) {
       CacheManager cacheManager = namesystem.getCacheManager();

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
 Sat Aug 16 20:58:20 2014
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -365,7 +366,8 @@ public class FSEditLogLoader {
         // add the op into retry cache if necessary
         if (toAddRetryCache) {
           HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
-              HdfsFileStatus.EMPTY_NAME, newFile, Snapshot.CURRENT_STATE_ID);
+              HdfsFileStatus.EMPTY_NAME, newFile,
+              BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID);
           fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
               addCloseOp.rpcCallId, stat);
         }

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 Sat Aug 16 20:58:20 2014
@@ -2221,9 +2221,9 @@ public class FSNamesystem implements Nam
   }
 
   /**
-   * Set the storage policy for an existing file.
+   * Set the storage policy for a file or a directory.
    *
-   * @param src file name
+   * @param src file/directory path
    * @param policyName storage policy name
    */
   void setStoragePolicy(String src, final String policyName)
@@ -4529,16 +4529,18 @@ public class FSNamesystem implements Nam
               "Can't find startAfter " + startAfterString);
         }
       }
-      
+
+      boolean isSuperUser = true;
       if (isPermissionEnabled) {
         if (dir.isDir(src)) {
           checkPathAccess(pc, src, FsAction.READ_EXECUTE);
         } else {
           checkTraverse(pc, src);
         }
+        isSuperUser = pc.isSuperUser();
       }
       logAuditEvent(true, "listStatus", src);
-      dl = dir.getListing(src, startAfter, needLocation);
+      dl = dir.getListing(src, startAfter, needLocation, isSuperUser);
     } finally {
       readUnlock();
     }

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
 Sat Aug 16 20:58:20 2014
@@ -684,6 +684,14 @@ public abstract class INode implements I
     return this;
   }
 
+  /**
+   * @return the storage policy id of the inode
+   */
+  public abstract byte getStoragePolicyID(int snapshotId);
+
+  public byte getStoragePolicyID() {
+    return getStoragePolicyID(Snapshot.CURRENT_STATE_ID);
+  }
 
   /**
    * Breaks {@code path} into components.

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
 Sat Aug 16 20:58:20 2014
@@ -61,6 +61,9 @@ public interface INodeAttributes {
   /** @return the access time. */
   public long getAccessTime();
 
+  /** @return the storage policy ID */
+  public byte getStoragePolicyID();
+
   /** A read-only copy of the inode attributes. */
   public static abstract class SnapshotCopy implements INodeAttributes {
     private final byte[] name;

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
 Sat Aug 16 20:58:20 2014
@@ -26,7 +26,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.PathIsNotDirectoryException;
+import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.util.ReadO
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 /**
  * Directory INode class.
@@ -103,6 +106,22 @@ public class INodeDirectory extends INod
     return this;
   }
 
+  @Override
+  public byte getStoragePolicyID(int snapshotId) {
+    if (snapshotId != Snapshot.CURRENT_STATE_ID) {
+      return getSnapshotINode(snapshotId).getStoragePolicyID();
+    }
+    XAttrFeature f = getXAttrFeature();
+    ImmutableList<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f
+        .getXAttrs();
+    for (XAttr xattr : xattrs) {
+      if (BlockStoragePolicy.isStoragePolicyXAttr(xattr)) {
+        return (xattr.getValue())[0];
+      }
+    }
+    return BlockStoragePolicy.ID_UNSPECIFIED;
+  }
+
   void setQuota(long nsQuota, long dsQuota) {
     DirectoryWithQuotaFeature quota = getDirectoryWithQuotaFeature();
     if (quota != null) {

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
 Sat Aug 16 20:58:20 2014
@@ -18,10 +18,12 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 /**
  * The attributes of an inode.
@@ -58,6 +60,19 @@ public interface INodeDirectoryAttribute
           && getAclFeature() == other.getAclFeature()
           && getXAttrFeature() == other.getXAttrFeature();
     }
+
+    @Override
+    public byte getStoragePolicyID() {
+      XAttrFeature f = getXAttrFeature();
+      ImmutableList<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f
+          .getXAttrs();
+      for (XAttr xattr : xattrs) {
+        if (BlockStoragePolicy.isStoragePolicyXAttr(xattr)) {
+          return (xattr.getValue())[0];
+        }
+      }
+      return BlockStoragePolicy.ID_UNSPECIFIED;
+    }
   }
 
   public static class CopyWithQuota extends 
INodeDirectoryAttributes.SnapshotCopy {

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
 Sat Aug 16 20:58:20 2014
@@ -368,19 +368,27 @@ public class INodeFile extends INodeWith
   }
 
   @Override
+  public byte getStoragePolicyID(int snapshotId) {
+    if (snapshotId != Snapshot.CURRENT_STATE_ID) {
+      return getSnapshotINode(snapshotId).getStoragePolicyID();
+    }
+    return getStoragePolicyID();
+  }
+
+  @Override
   public byte getStoragePolicyID() {
     return HeaderFormat.getStoragePolicyID(header);
   }
 
-  /** Set the policy id of the file */
-  public final void setStoragePolicyID(byte policyId) {
-    header = HeaderFormat.STORAGE_POLICY_ID.BITS.combine(policyId, header);
+  private void setStoragePolicyID(byte storagePolicyId) {
+    header = HeaderFormat.STORAGE_POLICY_ID.BITS.combine(storagePolicyId,
+        header);
   }
 
-  public final void setStoragePolicyID(byte policyId, int lastSnapshotId)
-      throws QuotaExceededException {
-    recordModification(lastSnapshotId);
-    setStoragePolicyID(policyId);
+  public final void setStoragePolicyID(byte storagePolicyId,
+      int latestSnapshotId) throws QuotaExceededException {
+    recordModification(latestSnapshotId);
+    setStoragePolicyID(storagePolicyId);
   }
 
   @Override

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
 Sat Aug 16 20:58:20 2014
@@ -32,9 +32,6 @@ public interface INodeFileAttributes ext
 
   /** @return preferred block size in bytes */
   public long getPreferredBlockSize();
-  
-  /** @return the storage policy ID. */
-  public byte getStoragePolicyID();
 
   /** @return the header as a long. */
   public long getHeaderLong();

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
 Sat Aug 16 20:58:20 2014
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.namenode.Quota.Counts;
 import org.apache.hadoop.util.GSet;
@@ -121,6 +122,11 @@ public class INodeMap {
           boolean countDiffChange) throws QuotaExceededException {
         return null;
       }
+
+      @Override
+      public byte getStoragePolicyID(int snapshotId) {
+        return BlockStoragePolicy.ID_UNSPECIFIED;
+      }
     };
       
     return map.get(inode);

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
 Sat Aug 16 20:58:20 2014
@@ -287,6 +287,11 @@ public abstract class INodeReference ext
   }
 
   @Override
+  public final byte getStoragePolicyID(int snapshotId) {
+    return referred.getStoragePolicyID(snapshotId);
+  }
+
+  @Override
   final void recordModification(int latestSnapshotId)
       throws QuotaExceededException {
     referred.recordModification(latestSnapshotId);

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
 Sat Aug 16 20:58:20 2014
@@ -145,4 +145,10 @@ public class INodeSymlink extends INodeW
   public void addXAttrFeature(XAttrFeature f) {
     throw new UnsupportedOperationException("XAttrs are not supported on 
symlinks");
   }
+
+  @Override
+  public byte getStoragePolicyID(int snapshotId) {
+    throw new UnsupportedOperationException(
+        "Storage policy are not supported on symlinks");
+  }
 }

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
 Sat Aug 16 20:58:20 2014
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.*;
@@ -220,6 +221,7 @@ public class JsonUtil {
     m.put("replication", status.getReplication());
     m.put("fileId", status.getFileId());
     m.put("childrenNum", status.getChildrenNum());
+    m.put("storagePolicy", status.getStoragePolicy());
     return includeType ? toJsonString(FileStatus.class, m): JSON.toString(m);
   }
 
@@ -250,9 +252,12 @@ public class JsonUtil {
     Long childrenNumLong = (Long) m.get("childrenNum");
     final int childrenNum = (childrenNumLong == null) ? -1
             : childrenNumLong.intValue();
+    final byte storagePolicy = m.containsKey("storagePolicy") ?
+        (byte) (long) (Long) m.get("storagePolicy") :
+          BlockStoragePolicy.ID_UNSPECIFIED;
     return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
-        blockSize, mTime, aTime, permission, owner, group,
-        symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum);
+        blockSize, mTime, aTime, permission, owner, group, symlink,
+        DFSUtil.string2Bytes(localName), fileId, childrenNum, storagePolicy);
   }
 
   /** Convert an ExtendedBlock to a Json map. */

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
 Sat Aug 16 20:58:20 2014
@@ -244,7 +244,8 @@ message HdfsFileStatusProto {
   // Optional field for fileId
   optional uint64 fileId = 13 [default = 0]; // default as an invalid id
   optional int32 childrenNum = 14 [default = -1];
-} 
+  optional uint32 storagePolicy = 15 [default = 0]; // block storage policy id
+}
 
 /**
  * Checksum algorithms/types used in HDFS

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
 Sat Aug 16 20:58:20 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED;
+
 import java.io.FileNotFoundException;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -25,9 +27,10 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
-import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -133,12 +136,18 @@ public class TestBlockStoragePolicy {
     Assert.assertEquals(null, policy.getReplicationFallback(both));
   }
 
+  private void checkDirectoryListing(HdfsFileStatus[] stats, byte... policies) 
{
+    Assert.assertEquals(stats.length, policies.length);
+    for (int i = 0; i < stats.length; i++) {
+      Assert.assertEquals(stats[i].getStoragePolicy(), policies[i]);
+    }
+  }
+
   @Test
   public void testSetStoragePolicy() throws Exception {
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(REPLICATION).build();
     cluster.waitActive();
-    FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
     final DistributedFileSystem fs = cluster.getFileSystem();
     try {
       final Path dir = new Path("/testSetStoragePolicy");
@@ -158,10 +167,13 @@ public class TestBlockStoragePolicy {
         GenericTestUtils.assertExceptionContains(invalidPolicyName, e);
       }
 
-      // check internal status
-      INodeFile fooFileNode = 
fsdir.getINode4Write(fooFile.toString()).asFile();
-      INodeFile barFile1Node = 
fsdir.getINode4Write(barFile1.toString()).asFile();
-      INodeFile barFile2Node = 
fsdir.getINode4Write(barFile2.toString()).asFile();
+      // check storage policy
+      HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(),
+          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
+      HdfsFileStatus[] barList = fs.getClient().listPaths(barDir.toString(),
+          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
+      checkDirectoryListing(dirList, ID_UNSPECIFIED, ID_UNSPECIFIED);
+      checkDirectoryListing(barList, ID_UNSPECIFIED, ID_UNSPECIFIED);
 
       final Path invalidPath = new Path("/invalidPath");
       try {
@@ -172,37 +184,116 @@ public class TestBlockStoragePolicy {
       }
 
       fs.setStoragePolicy(fooFile, "COLD");
-      fs.setStoragePolicy(barFile1, "WARM");
-      fs.setStoragePolicy(barFile2, "WARM");
-      // TODO: set storage policy on a directory
-
-      // check internal status
-      Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID());
-      Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID());
-      Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID());
+      fs.setStoragePolicy(barDir, "WARM");
+      fs.setStoragePolicy(barFile2, "HOT");
+
+      dirList = fs.getClient().listPaths(dir.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing();
+      barList = fs.getClient().listPaths(barDir.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing();
+      checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
+      checkDirectoryListing(barList, WARM, HOT);
 
       // restart namenode to make sure the editlog is correct
       cluster.restartNameNode(true);
-      fsdir = cluster.getNamesystem().getFSDirectory();
-      fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile();
-      Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID());
-      barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile();
-      Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID());
-      barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile();
-      Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID());
+      dirList = fs.getClient().listPaths(dir.toString(),
+          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
+      barList = fs.getClient().listPaths(barDir.toString(),
+          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
+      checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
+      checkDirectoryListing(barList, WARM, HOT);
 
       // restart namenode with checkpoint to make sure the fsimage is correct
       fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
       fs.saveNamespace();
       fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
       cluster.restartNameNode(true);
-      fsdir = cluster.getNamesystem().getFSDirectory();
-      fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile();
-      Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID());
-      barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile();
-      Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID());
-      barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile();
-      Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID());
+      dirList = fs.getClient().listPaths(dir.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing();
+      barList = fs.getClient().listPaths(barDir.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing();
+      checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
+      checkDirectoryListing(barList, WARM, HOT);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testSetStoragePolicyWithSnapshot() throws Exception {
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(REPLICATION).build();
+    cluster.waitActive();
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    try {
+      final Path dir = new Path("/testSetStoragePolicyWithSnapshot");
+      final Path fooDir = new Path(dir, "foo");
+      final Path fooFile1= new Path(fooDir, "f1");
+      final Path fooFile2= new Path(fooDir, "f2");
+      DFSTestUtil.createFile(fs, fooFile1, FILE_LEN, REPLICATION, 0L);
+      DFSTestUtil.createFile(fs, fooFile2, FILE_LEN, REPLICATION, 0L);
+
+      fs.setStoragePolicy(fooDir, "WARM");
+
+      HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(),
+          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
+      checkDirectoryListing(dirList, WARM);
+      HdfsFileStatus[] fooList = fs.getClient().listPaths(fooDir.toString(),
+          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
+      checkDirectoryListing(fooList, WARM, WARM);
+
+      // take snapshot
+      SnapshotTestHelper.createSnapshot(fs, dir, "s1");
+      // change the storage policy of fooFile1
+      fs.setStoragePolicy(fooFile1, "COLD");
+
+      fooList = fs.getClient().listPaths(fooDir.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing();
+      checkDirectoryListing(fooList, COLD, WARM);
+
+      // check the policy for /dir/.snapshot/s1/foo/f1
+      Path s1f1 = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo/f1");
+      DirectoryListing f1Listing = fs.getClient().listPaths(s1f1.toString(),
+          HdfsFileStatus.EMPTY_NAME);
+      checkDirectoryListing(f1Listing.getPartialListing(), WARM);
+
+      // delete f1
+      fs.delete(fooFile1, true);
+      fooList = fs.getClient().listPaths(fooDir.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing();
+      checkDirectoryListing(fooList, WARM);
+      // check the policy for /dir/.snapshot/s1/foo/f1 again after the deletion
+      checkDirectoryListing(fs.getClient().listPaths(s1f1.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM);
+
+      // change the storage policy of foo dir
+      fs.setStoragePolicy(fooDir, "HOT");
+      // /dir/foo is now hot
+      dirList = fs.getClient().listPaths(dir.toString(),
+          HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
+      checkDirectoryListing(dirList, HOT);
+      // /dir/foo/f2 is hot
+      fooList = fs.getClient().listPaths(fooDir.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing();
+      checkDirectoryListing(fooList, HOT);
+
+      // check storage policy of snapshot path
+      Path s1 = SnapshotTestHelper.getSnapshotRoot(dir, "s1");
+      Path s1foo = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo");
+      checkDirectoryListing(fs.getClient().listPaths(s1.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM);
+      // /dir/.snapshot/.s1/foo/f1 and /dir/.snapshot/.s1/foo/f2 are warm 
+      checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM, WARM);
+
+      // delete foo
+      fs.delete(fooDir, true);
+      checkDirectoryListing(fs.getClient().listPaths(s1.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM); 
+      checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
+          HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM, WARM);
     } finally {
       if (cluster != null) {
         cluster.shutdown();

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
 Sat Aug 16 20:58:20 2014
@@ -253,12 +253,12 @@ public class TestDFSClientRetries {
     Mockito.doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
                 (short) 777), "owner", "group", new byte[0], new byte[0],
-                1010, 0)).when(mockNN).getFileInfo(anyString());
+                1010, 0, (byte) 0)).when(mockNN).getFileInfo(anyString());
     
     Mockito.doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
                 (short) 777), "owner", "group", new byte[0], new byte[0],
-                1010, 0))
+                1010, 0, (byte) 0))
         .when(mockNN)
         .create(anyString(), (FsPermission) anyObject(), anyString(),
             (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
 Sat Aug 16 20:58:20 2014
@@ -17,11 +17,11 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.anyShort;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyShort;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
@@ -35,7 +35,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
@@ -339,12 +338,12 @@ public class TestLease {
     Mockito.doReturn(
         new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
             (short) 777), "owner", "group", new byte[0], new byte[0],
-            1010, 0)).when(mcp).getFileInfo(anyString());
+            1010, 0, (byte) 0)).when(mcp).getFileInfo(anyString());
     Mockito
         .doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
                 (short) 777), "owner", "group", new byte[0], new byte[0],
-                1010, 0))
+                1010, 0, (byte) 0))
         .when(mcp)
         .create(anyString(), (FsPermission) anyObject(), anyString(),
             (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
 Sat Aug 16 20:58:20 2014
@@ -1015,10 +1015,11 @@ public class TestFsck {
     path = DFSUtil.string2Bytes(pathString);
     long fileId = 312321L;
     int numChildren = 1;
+    byte storagePolicy = 0;
 
     HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
         blockSize, modTime, accessTime, perms, owner, group, symlink, path,
-        fileId, numChildren);
+        fileId, numChildren, storagePolicy);
     Result res = new Result(conf);
 
     try {

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java?rev=1618416&r1=1618415&r2=1618416&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java
 Sat Aug 16 20:58:20 2014
@@ -64,7 +64,7 @@ public class TestJsonUtil {
     final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
         now, now + 10, new FsPermission((short) 0644), "user", "group",
         DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
-        INodeId.GRANDFATHER_INODE_ID, 0);
+        INodeId.GRANDFATHER_INODE_ID, 0, (byte) 0);
     final FileStatus fstatus = toFileStatus(status, parent);
     System.out.println("status  = " + status);
     System.out.println("fstatus = " + fstatus);


Reply via email to