http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 7781244..532617f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -250,6 +250,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
@@ -2063,7 +2064,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     // update the quota: use the preferred block size for UC block
     final long diff =
         file.getPreferredBlockSize() - truncatedBlockUC.getNumBytes();
-    dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
+    dir.updateSpaceConsumed(iip, 0, diff, file.getBlockReplication());
     return newBlock;
   }
 
@@ -2671,7 +2672,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       if (ret != null) {
         // update the quota: use the preferred block size for UC block
         final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
-        dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
+        dir.updateSpaceConsumed(iip, 0, diff, file.getBlockReplication());
       }
     } else {
       BlockInfoContiguous lastBlock = file.getLastBlock();
@@ -3803,19 +3804,19 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
 
   /**
    * Set the namespace quota and diskspace quota for a directory.
-   * See {@link ClientProtocol#setQuota(String, long, long)} for the 
+   * See {@link ClientProtocol#setQuota(String, long, long, StorageType)} for 
the
    * contract.
    * 
    * Note: This does not support ".inodes" relative path.
    */
-  void setQuota(String src, long nsQuota, long dsQuota)
+  void setQuota(String src, long nsQuota, long dsQuota, StorageType type)
       throws IOException {
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set quota on " + src);
-      FSDirAttrOp.setQuota(dir, src, nsQuota, dsQuota);
+      FSDirAttrOp.setQuota(dir, src, nsQuota, dsQuota, type);
     } finally {
       writeUnlock();
     }
@@ -4042,7 +4043,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final long diff = fileINode.getPreferredBlockSize() - 
commitBlock.getNumBytes();    
     if (diff > 0) {
       try {
-        dir.updateSpaceConsumed(iip, 0, -diff*fileINode.getFileReplication());
+        dir.updateSpaceConsumed(iip, 0, -diff, fileINode.getFileReplication());
       } catch (IOException e) {
         LOG.warn("Unexpected exception while updating disk space.", e);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
index bafde45..7f3bf38 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -384,7 +385,9 @@ public abstract class INode implements INodeAttributes, 
Diff.Element<byte[]> {
    * 2.4 To clean {@link INodeDirectory} with snapshot: delete the 
corresponding 
    * snapshot in its diff list. Recursively clean its children.
    * </pre>
-   * 
+   *
+   * @param bsps
+   *          block storage policy suite to calculate intended storage type 
usage
    * @param snapshotId
    *          The id of the snapshot to delete. 
    *          {@link Snapshot#CURRENT_STATE_ID} means to delete the current
@@ -401,7 +404,8 @@ public abstract class INode implements INodeAttributes, 
Diff.Element<byte[]> {
    *          inodeMap
    * @return quota usage delta when deleting a snapshot
    */
-  public abstract Quota.Counts cleanSubtree(final int snapshotId,
+  public abstract QuotaCounts cleanSubtree(final BlockStoragePolicySuite bsps,
+      final int snapshotId,
       int priorSnapshotId, BlocksMapUpdateInfo collectedBlocks,
       List<INode> removedINodes);
   
@@ -411,7 +415,11 @@ public abstract class INode implements INodeAttributes, 
Diff.Element<byte[]> {
    * directory, the method goes down the subtree and collects blocks from the
    * descents, and clears its parent/children references as well. The method
    * also clears the diff list if the INode contains snapshot diff list.
-   * 
+   *
+   * @param bsps
+   *          block storage policy suite to calculate intended storage type 
usage
+   *          This is needed because INodeReference#destroyAndCollectBlocks() 
needs
+   *          to call INode#cleanSubtree(), which calls 
INode#computeQuotaUsage().
    * @param collectedBlocks
    *          blocks collected from the descents for further block
    *          deletion/update will be added to this map.
@@ -420,6 +428,7 @@ public abstract class INode implements INodeAttributes, 
Diff.Element<byte[]> {
    *          inodeMap
    */
   public abstract void destroyAndCollectBlocks(
+      BlockStoragePolicySuite bsps,
       BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes);
 
   /** Compute {@link ContentSummary}. Blocking call */
@@ -434,11 +443,12 @@ public abstract class INode implements INodeAttributes, 
Diff.Element<byte[]> {
   public final ContentSummary computeAndConvertContentSummary(
       ContentSummaryComputationContext summary) {
     Content.Counts counts = computeContentSummary(summary).getCounts();
-    final Quota.Counts q = getQuotaCounts();
+    final QuotaCounts q = getQuotaCounts();
     return new ContentSummary(counts.get(Content.LENGTH),
         counts.get(Content.FILE) + counts.get(Content.SYMLINK),
-        counts.get(Content.DIRECTORY), q.get(Quota.NAMESPACE),
-        counts.get(Content.DISKSPACE), q.get(Quota.DISKSPACE));
+        counts.get(Content.DIRECTORY), q.getNameSpace(),
+        counts.get(Content.DISKSPACE), q.getDiskSpace());
+    // TODO: storage type quota reporting HDFS-7701.
   }
 
   /**
@@ -450,24 +460,24 @@ public abstract class INode implements INodeAttributes, 
Diff.Element<byte[]> {
   public abstract ContentSummaryComputationContext computeContentSummary(
       ContentSummaryComputationContext summary);
 
-  
+
   /**
-   * Check and add namespace/diskspace consumed to itself and the ancestors.
+   * Check and add namespace/diskspace/storagetype consumed to itself and the 
ancestors.
    * @throws QuotaExceededException if quote is violated.
    */
-  public void addSpaceConsumed(long nsDelta, long dsDelta, boolean verify) 
-      throws QuotaExceededException {
-    addSpaceConsumed2Parent(nsDelta, dsDelta, verify);
+  public void addSpaceConsumed(QuotaCounts counts, boolean verify)
+    throws QuotaExceededException {
+    addSpaceConsumed2Parent(counts, verify);
   }
 
   /**
-   * Check and add namespace/diskspace consumed to itself and the ancestors.
+   * Check and add namespace/diskspace/storagetype consumed to itself and the 
ancestors.
    * @throws QuotaExceededException if quote is violated.
    */
-  void addSpaceConsumed2Parent(long nsDelta, long dsDelta, boolean verify) 
-      throws QuotaExceededException {
+  void addSpaceConsumed2Parent(QuotaCounts counts, boolean verify)
+    throws QuotaExceededException {
     if (parent != null) {
-      parent.addSpaceConsumed(nsDelta, dsDelta, verify);
+      parent.addSpaceConsumed(counts, verify);
     }
   }
 
@@ -475,20 +485,24 @@ public abstract class INode implements INodeAttributes, 
Diff.Element<byte[]> {
    * Get the quota set for this inode
    * @return the quota counts.  The count is -1 if it is not set.
    */
-  public Quota.Counts getQuotaCounts() {
-    return Quota.Counts.newInstance(-1, -1);
+  public QuotaCounts getQuotaCounts() {
+    return new QuotaCounts.Builder().
+        nameCount(HdfsConstants.QUOTA_RESET).
+        spaceCount(HdfsConstants.QUOTA_RESET).
+        typeCounts(HdfsConstants.QUOTA_RESET).
+        build();
   }
-  
+
   public final boolean isQuotaSet() {
-    final Quota.Counts q = getQuotaCounts();
-    return q.get(Quota.NAMESPACE) >= 0 || q.get(Quota.DISKSPACE) >= 0;
+    final QuotaCounts qc = getQuotaCounts();
+    return qc.anyNsSpCountGreaterOrEqual(0) || 
qc.anyTypeCountGreaterOrEqual(0);
   }
-  
+
   /**
    * Count subtree {@link Quota#NAMESPACE} and {@link Quota#DISKSPACE} usages.
    */
-  public final Quota.Counts computeQuotaUsage() {
-    return computeQuotaUsage(new Quota.Counts(), true);
+  public final QuotaCounts computeQuotaUsage(BlockStoragePolicySuite bsps) {
+    return computeQuotaUsage(bsps, new QuotaCounts.Builder().build(), true);
   }
 
   /**
@@ -511,7 +525,8 @@ public abstract class INode implements INodeAttributes, 
Diff.Element<byte[]> {
    * creation time of the snapshot associated with the {@link WithName} node.
    * We do not count in the size of the diff list.  
    * <pre>
-   * 
+   *
+   * @param bsps Block storage policy suite to calculate intended storage type 
usage
    * @param counts The subtree counts for returning.
    * @param useCache Whether to use cached quota usage. Note that 
    *                 {@link WithName} node never uses cache for its subtree.
@@ -521,14 +536,15 @@ public abstract class INode implements INodeAttributes, 
Diff.Element<byte[]> {
    *                       {@link WithName} node.
    * @return The same objects as the counts parameter.
    */
-  public abstract Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean useCache, int lastSnapshotId);
+  public abstract QuotaCounts computeQuotaUsage(
+    BlockStoragePolicySuite bsps,
+    QuotaCounts counts, boolean useCache, int lastSnapshotId);
 
-  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean useCache) {
-    return computeQuotaUsage(counts, useCache, Snapshot.CURRENT_STATE_ID);
+  public final QuotaCounts computeQuotaUsage(
+    BlockStoragePolicySuite bsps, QuotaCounts counts, boolean useCache) {
+    return computeQuotaUsage(bsps, counts, useCache, 
Snapshot.CURRENT_STATE_ID);
   }
-  
+
   /**
    * @return null if the local name is null; otherwise, return the local name.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
index 6bf57f0..f9d160b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFea
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
@@ -136,35 +137,45 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
         BlockStoragePolicySuite.ID_UNSPECIFIED;
   }
 
-  void setQuota(long nsQuota, long dsQuota) {
+  void setQuota(BlockStoragePolicySuite bsps, long nsQuota, long dsQuota, 
StorageType type) {
     DirectoryWithQuotaFeature quota = getDirectoryWithQuotaFeature();
     if (quota != null) {
       // already has quota; so set the quota to the new values
-      quota.setQuota(nsQuota, dsQuota);
+      if (type != null) {
+        quota.setQuota(dsQuota, type);
+      } else {
+        quota.setQuota(nsQuota, dsQuota);
+      }
       if (!isQuotaSet() && !isRoot()) {
         removeFeature(quota);
       }
     } else {
-      final Quota.Counts c = computeQuotaUsage();
-      quota = addDirectoryWithQuotaFeature(nsQuota, dsQuota);
-      quota.setSpaceConsumed(c.get(Quota.NAMESPACE), c.get(Quota.DISKSPACE));
+      final QuotaCounts c = computeQuotaUsage(bsps);
+      DirectoryWithQuotaFeature.Builder builder =
+          new DirectoryWithQuotaFeature.Builder().nameSpaceQuota(nsQuota);
+      if (type != null) {
+        builder.typeQuota(type, dsQuota);
+      } else {
+        builder.spaceQuota(dsQuota);
+      }
+      addDirectoryWithQuotaFeature(builder.build()).setSpaceConsumed(c);
     }
   }
 
   @Override
-  public Quota.Counts getQuotaCounts() {
+  public QuotaCounts getQuotaCounts() {
     final DirectoryWithQuotaFeature q = getDirectoryWithQuotaFeature();
     return q != null? q.getQuota(): super.getQuotaCounts();
   }
 
   @Override
-  public void addSpaceConsumed(long nsDelta, long dsDelta, boolean verify) 
-      throws QuotaExceededException {
+  public void addSpaceConsumed(QuotaCounts counts, boolean verify)
+    throws QuotaExceededException {
     final DirectoryWithQuotaFeature q = getDirectoryWithQuotaFeature();
     if (q != null) {
-      q.addSpaceConsumed(this, nsDelta, dsDelta, verify);
+      q.addSpaceConsumed(this, counts, verify);
     } else {
-      addSpaceConsumed2Parent(nsDelta, dsDelta, verify);
+      addSpaceConsumed2Parent(counts, verify);
     }
   }
 
@@ -182,12 +193,10 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
   }
 
   DirectoryWithQuotaFeature addDirectoryWithQuotaFeature(
-      long nsQuota, long dsQuota) {
+      DirectoryWithQuotaFeature q) {
     Preconditions.checkState(!isWithQuota(), "Directory is already with 
quota");
-    final DirectoryWithQuotaFeature quota = new DirectoryWithQuotaFeature(
-        nsQuota, dsQuota);
-    addFeature(quota);
-    return quota;
+    addFeature(q);
+    return q;
   }
 
   int searchChildren(byte[] name) {
@@ -254,10 +263,10 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
     return getDirectorySnapshottableFeature().addSnapshot(this, id, name);
   }
 
-  public Snapshot removeSnapshot(String snapshotName,
+  public Snapshot removeSnapshot(BlockStoragePolicySuite bsps, String 
snapshotName,
       BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
       throws SnapshotException {
-    return getDirectorySnapshottableFeature().removeSnapshot(this,
+    return getDirectorySnapshottableFeature().removeSnapshot(bsps, this,
         snapshotName, collectedBlocks, removedINodes);
   }
 
@@ -559,7 +568,7 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
   }
 
   @Override
-  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
+  public QuotaCounts computeQuotaUsage(BlockStoragePolicySuite bsps, 
QuotaCounts counts, boolean useCache,
       int lastSnapshotId) {
     final DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     
@@ -570,9 +579,9 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
         && !(useCache && isQuotaSet())) {
       ReadOnlyList<INode> childrenList = getChildrenList(lastSnapshotId);
       for (INode child : childrenList) {
-        child.computeQuotaUsage(counts, useCache, lastSnapshotId);
+        child.computeQuotaUsage(bsps, counts, useCache, lastSnapshotId);
       }
-      counts.add(Quota.NAMESPACE, 1);
+      counts.addNameSpace(1);
       return counts;
     }
     
@@ -582,27 +591,28 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
       return q.addNamespaceDiskspace(counts);
     } else {
       useCache = q != null && !q.isQuotaSet() ? false : useCache;
-      return computeDirectoryQuotaUsage(counts, useCache, lastSnapshotId);
+      return computeDirectoryQuotaUsage(bsps, counts, useCache, 
lastSnapshotId);
     }
   }
 
-  private Quota.Counts computeDirectoryQuotaUsage(Quota.Counts counts,
-      boolean useCache, int lastSnapshotId) {
+  private QuotaCounts computeDirectoryQuotaUsage(BlockStoragePolicySuite bsps,
+      QuotaCounts counts, boolean useCache, int lastSnapshotId) {
     if (children != null) {
       for (INode child : children) {
-        child.computeQuotaUsage(counts, useCache, lastSnapshotId);
+        child.computeQuotaUsage(bsps, counts, useCache, lastSnapshotId);
       }
     }
-    return computeQuotaUsage4CurrentDirectory(counts);
+    return computeQuotaUsage4CurrentDirectory(bsps, counts);
   }
   
   /** Add quota usage for this inode excluding children. */
-  public Quota.Counts computeQuotaUsage4CurrentDirectory(Quota.Counts counts) {
-    counts.add(Quota.NAMESPACE, 1);
+  public QuotaCounts computeQuotaUsage4CurrentDirectory(
+      BlockStoragePolicySuite bsps, QuotaCounts counts) {
+    counts.addNameSpace(1);
     // include the diff list
     DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     if (sf != null) {
-      sf.computeQuotaUsage4CurrentDirectory(counts);
+      sf.computeQuotaUsage4CurrentDirectory(bsps, counts);
     }
     return counts;
   }
@@ -612,7 +622,8 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
       ContentSummaryComputationContext summary) {
     final DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     if (sf != null) {
-      sf.computeContentSummary4Snapshot(summary.getCounts());
+      sf.computeContentSummary4Snapshot(summary.getBlockStoragePolicySuite(),
+          summary.getCounts());
     }
     final DirectoryWithQuotaFeature q = getDirectoryWithQuotaFeature();
     if (q != null) {
@@ -702,7 +713,8 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
    * (with OVERWRITE option) removes a file/dir from the dst tree, add it back
    * and delete possible record in the deleted list.  
    */
-  public void undoRename4DstParent(final INode deletedChild,
+  public void undoRename4DstParent(final BlockStoragePolicySuite bsps,
+      final INode deletedChild,
       int latestSnapshotId) throws QuotaExceededException {
     DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     Preconditions.checkState(sf != null,
@@ -714,9 +726,9 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
     // update quota usage if adding is successfully and the old child has not
     // been stored in deleted list before
     if (added && !removeDeletedChild) {
-      final Quota.Counts counts = deletedChild.computeQuotaUsage();
-      addSpaceConsumed(counts.get(Quota.NAMESPACE),
-          counts.get(Quota.DISKSPACE), false);
+      final QuotaCounts counts = deletedChild.computeQuotaUsage(bsps);
+      addSpaceConsumed(counts, false);
+
     }
   }
 
@@ -732,10 +744,11 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
   }
 
   /** Call cleanSubtree(..) recursively down the subtree. */
-  public Quota.Counts cleanSubtreeRecursively(final int snapshot,
+  public QuotaCounts cleanSubtreeRecursively(final BlockStoragePolicySuite 
bsps,
+      final int snapshot,
       int prior, final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes, final Map<INode, INode> excludedNodes) {
-    Quota.Counts counts = Quota.Counts.newInstance();
+    QuotaCounts counts = new QuotaCounts.Builder().build();
     // in case of deletion snapshot, since this call happens after we modify
     // the diff list, the snapshot to be deleted has been combined or renamed
     // to its latest previous snapshot. (besides, we also need to consider 
nodes
@@ -748,7 +761,7 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
           && excludedNodes.containsKey(child)) {
         continue;
       } else {
-        Quota.Counts childCounts = child.cleanSubtree(snapshot, prior,
+        QuotaCounts childCounts = child.cleanSubtree(bsps, snapshot, prior,
             collectedBlocks, removedINodes);
         counts.add(childCounts);
       }
@@ -757,14 +770,15 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
   }
 
   @Override
-  public void destroyAndCollectBlocks(final BlocksMapUpdateInfo 
collectedBlocks,
+  public void destroyAndCollectBlocks(final BlockStoragePolicySuite bsps,
+      final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) {
     final DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     if (sf != null) {
-      sf.clear(this, collectedBlocks, removedINodes);
+      sf.clear(bsps, this, collectedBlocks, removedINodes);
     }
     for (INode child : getChildrenList(Snapshot.CURRENT_STATE_ID)) {
-      child.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+      child.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
     }
     if (getAclFeature() != null) {
       AclStorage.removeAclFeature(getAclFeature());
@@ -774,30 +788,30 @@ public class INodeDirectory extends 
INodeWithAdditionalFields
   }
   
   @Override
-  public Quota.Counts cleanSubtree(final int snapshotId, int priorSnapshotId,
+  public QuotaCounts cleanSubtree(final BlockStoragePolicySuite bsps,
+      final int snapshotId, int priorSnapshotId,
       final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) {
     DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     // there is snapshot data
     if (sf != null) {
-      return sf.cleanDirectory(this, snapshotId, priorSnapshotId,
+      return sf.cleanDirectory(bsps, this, snapshotId, priorSnapshotId,
           collectedBlocks, removedINodes);
     }
     // there is no snapshot data
     if (priorSnapshotId == Snapshot.NO_SNAPSHOT_ID
         && snapshotId == Snapshot.CURRENT_STATE_ID) {
       // destroy the whole subtree and collect blocks that should be deleted
-      Quota.Counts counts = Quota.Counts.newInstance();
-      this.computeQuotaUsage(counts, true);
-      destroyAndCollectBlocks(collectedBlocks, removedINodes);
+      QuotaCounts counts = new QuotaCounts.Builder().build();
+      this.computeQuotaUsage(bsps, counts, true);
+      destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
       return counts; 
     } else {
       // process recursively down the subtree
-      Quota.Counts counts = cleanSubtreeRecursively(snapshotId, 
priorSnapshotId,
+      QuotaCounts counts = cleanSubtreeRecursively(bsps, snapshotId, 
priorSnapshotId,
           collectedBlocks, removedINodes, null);
       if (isQuotaSet()) {
-        getDirectoryWithQuotaFeature().addSpaceConsumed2Cache(
-            -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+        
getDirectoryWithQuotaFeature().addSpaceConsumed2Cache(counts.negation());
       }
       return counts;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
index 26a6678..83649ec 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.util.EnumCounters;
 
 import com.google.common.base.Preconditions;
 
@@ -27,7 +29,7 @@ import com.google.common.base.Preconditions;
  */
 @InterfaceAudience.Private
 public interface INodeDirectoryAttributes extends INodeAttributes {
-  public Quota.Counts getQuotaCounts();
+  public QuotaCounts getQuotaCounts();
 
   public boolean metadataEquals(INodeDirectoryAttributes other);
   
@@ -45,8 +47,9 @@ public interface INodeDirectoryAttributes extends 
INodeAttributes {
     }
 
     @Override
-    public Quota.Counts getQuotaCounts() {
-      return Quota.Counts.newInstance(-1, -1);
+    public QuotaCounts getQuotaCounts() {
+      return new QuotaCounts.Builder().nameCount(-1).
+          spaceCount(-1).typeCounts(-1).build();
     }
 
     @Override
@@ -60,29 +63,26 @@ public interface INodeDirectoryAttributes extends 
INodeAttributes {
   }
 
   public static class CopyWithQuota extends 
INodeDirectoryAttributes.SnapshotCopy {
-    private final long nsQuota;
-    private final long dsQuota;
-
+    private QuotaCounts quota;
 
     public CopyWithQuota(byte[] name, PermissionStatus permissions,
         AclFeature aclFeature, long modificationTime, long nsQuota,
-        long dsQuota, XAttrFeature xAttrsFeature) {
+        long dsQuota, EnumCounters<StorageType> typeQuotas, XAttrFeature 
xAttrsFeature) {
       super(name, permissions, aclFeature, modificationTime, xAttrsFeature);
-      this.nsQuota = nsQuota;
-      this.dsQuota = dsQuota;
+      this.quota = new QuotaCounts.Builder().nameCount(nsQuota).
+          spaceCount(dsQuota).typeCounts(typeQuotas).build();
     }
 
     public CopyWithQuota(INodeDirectory dir) {
       super(dir);
       Preconditions.checkArgument(dir.isQuotaSet());
-      final Quota.Counts q = dir.getQuotaCounts();
-      this.nsQuota = q.get(Quota.NAMESPACE);
-      this.dsQuota = q.get(Quota.DISKSPACE);
+      final QuotaCounts q = dir.getQuotaCounts();
+      this.quota = new QuotaCounts.Builder().quotaCount(q).build();
     }
-    
+
     @Override
-    public Quota.Counts getQuotaCounts() {
-      return Quota.Counts.newInstance(nsQuota, dsQuota);
+    public QuotaCounts getQuotaCounts() {
+      return new QuotaCounts.Builder().quotaCount(quota).build();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 81f6ae5..8660947 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
@@ -42,6 +43,7 @@ import 
org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.util.LongBitFormat;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -488,21 +490,22 @@ public class INodeFile extends INodeWithAdditionalFields
   }
 
   @Override
-  public Quota.Counts cleanSubtree(final int snapshot, int priorSnapshotId,
+  public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps, final int 
snapshot,
+                                  int priorSnapshotId,
       final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) {
     FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
     if (sf != null) {
-      return sf.cleanFile(this, snapshot, priorSnapshotId, collectedBlocks,
+      return sf.cleanFile(bsps, this, snapshot, priorSnapshotId, 
collectedBlocks,
           removedINodes);
     }
-    Quota.Counts counts = Quota.Counts.newInstance();
+    QuotaCounts counts = new QuotaCounts.Builder().build();
     if (snapshot == CURRENT_STATE_ID) {
       if (priorSnapshotId == NO_SNAPSHOT_ID) {
         // this only happens when deleting the current file and the file is not
         // in any snapshot
-        computeQuotaUsage(counts, false);
-        destroyAndCollectBlocks(collectedBlocks, removedINodes);
+        computeQuotaUsage(bsps, counts, false);
+        destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
       } else {
         // when deleting the current file and the file is in snapshot, we 
should
         // clean the 0-sized block if the file is UC
@@ -516,8 +519,8 @@ public class INodeFile extends INodeWithAdditionalFields
   }
 
   @Override
-  public void destroyAndCollectBlocks(BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes) {
+  public void destroyAndCollectBlocks(BlockStoragePolicySuite bsps,
+      BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
     if (blocks != null && collectedBlocks != null) {
       for (BlockInfoContiguous blk : blocks) {
         collectedBlocks.addDeleteBlock(blk);
@@ -543,11 +546,15 @@ public class INodeFile extends INodeWithAdditionalFields
     return getFullPathName();
   }
 
+  // This is the only place that needs to use the BlockStoragePolicySuite to
+  // derive the intended storage type usage for quota by storage type
   @Override
-  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean useCache, int lastSnapshotId) {
+  public final QuotaCounts computeQuotaUsage(
+      BlockStoragePolicySuite bsps, QuotaCounts counts, boolean useCache,
+      int lastSnapshotId) {
     long nsDelta = 1;
-    final long dsDelta;
+    final long dsDeltaNoReplication;
+    short dsReplication;
     FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
     if (sf != null) {
       FileDiffList fileDiffList = sf.getDiffs();
@@ -555,18 +562,33 @@ public class INodeFile extends INodeWithAdditionalFields
 
       if (lastSnapshotId == Snapshot.CURRENT_STATE_ID
           || last == Snapshot.CURRENT_STATE_ID) {
-        dsDelta = diskspaceConsumed();
+        dsDeltaNoReplication = diskspaceConsumedNoReplication();
+        dsReplication = getBlockReplication();
       } else if (last < lastSnapshotId) {
-        dsDelta = computeFileSize(true, false) * getFileReplication();
-      } else {      
+        dsDeltaNoReplication = computeFileSize(true, false);
+        dsReplication = getFileReplication();
+      } else {
         int sid = fileDiffList.getSnapshotById(lastSnapshotId);
-        dsDelta = diskspaceConsumed(sid);
+        dsDeltaNoReplication = diskspaceConsumedNoReplication(sid);
+        dsReplication = getReplication(sid);
       }
     } else {
-      dsDelta = diskspaceConsumed();
+      dsDeltaNoReplication = diskspaceConsumedNoReplication();
+      dsReplication = getBlockReplication();
+    }
+    counts.addNameSpace(nsDelta);
+    counts.addDiskSpace(dsDeltaNoReplication * dsReplication);
+
+    if (getStoragePolicyID() != BlockStoragePolicySuite.ID_UNSPECIFIED){
+      BlockStoragePolicy bsp = bsps.getPolicy(getStoragePolicyID());
+      List<StorageType> storageTypes = bsp.chooseStorageTypes(dsReplication);
+      for (StorageType t : storageTypes) {
+        if (!t.supportTypeQuota()) {
+          continue;
+        }
+        counts.addTypeSpace(t, dsDeltaNoReplication);
+      }
     }
-    counts.add(Quota.NAMESPACE, nsDelta);
-    counts.add(Quota.DISKSPACE, dsDelta);
     return counts;
   }
 
@@ -660,9 +682,13 @@ public class INodeFile extends INodeWithAdditionalFields
    * Use preferred block size for the last block if it is under construction.
    */
   public final long diskspaceConsumed() {
+    return diskspaceConsumedNoReplication() * getBlockReplication();
+  }
+
+  public final long diskspaceConsumedNoReplication() {
     FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
     if(sf == null) {
-      return computeFileSize(true, true) * getBlockReplication();
+      return computeFileSize(true, true);
     }
 
     // Collect all distinct blocks
@@ -684,18 +710,34 @@ public class INodeFile extends INodeWithAdditionalFields
         lastBlock instanceof BlockInfoContiguousUnderConstruction) {
       size += getPreferredBlockSize() - lastBlock.getNumBytes();
     }
-    return size * getBlockReplication();
+    return size;
   }
 
   public final long diskspaceConsumed(int lastSnapshotId) {
     if (lastSnapshotId != CURRENT_STATE_ID) {
       return computeFileSize(lastSnapshotId)
-          * getFileReplication(lastSnapshotId);
+        * getFileReplication(lastSnapshotId);
     } else {
       return diskspaceConsumed();
     }
   }
-  
+
+  public final short getReplication(int lastSnapshotId) {
+    if (lastSnapshotId != CURRENT_STATE_ID) {
+      return getFileReplication(lastSnapshotId);
+    } else {
+      return getBlockReplication();
+    }
+  }
+
+  public final long diskspaceConsumedNoReplication(int lastSnapshotId) {
+    if (lastSnapshotId != CURRENT_STATE_ID) {
+      return computeFileSize(lastSnapshotId);
+    } else {
+      return diskspaceConsumedNoReplication();
+    }
+  }
+
   /**
    * Return the penultimate allocated block for this file.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index 8629bf8..cb270bf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
-import org.apache.hadoop.hdfs.server.namenode.Quota.Counts;
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.LightWeightGSet;
 
@@ -97,17 +96,18 @@ public class INodeMap {
       }
       
       @Override
-      public void destroyAndCollectBlocks(BlocksMapUpdateInfo collectedBlocks,
-          List<INode> removedINodes) {
+      public void destroyAndCollectBlocks(BlockStoragePolicySuite bsps,
+          BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes) {
         // Nothing to do
       }
-      
+
       @Override
-      public Counts computeQuotaUsage(Counts counts, boolean useCache,
-          int lastSnapshotId) {
+      public QuotaCounts computeQuotaUsage(
+          BlockStoragePolicySuite bsps, QuotaCounts counts,
+          boolean useCache, int lastSnapshotId) {
         return null;
       }
-      
+
       @Override
       public ContentSummaryComputationContext computeContentSummary(
           ContentSummaryComputationContext summary) {
@@ -115,9 +115,10 @@ public class INodeMap {
       }
       
       @Override
-      public Counts cleanSubtree(int snapshotId, int priorSnapshotId,
+      public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps,
+          int snapshotId, int priorSnapshotId,
           BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes) {
-        return null;
+          return null;
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
index b39151a..f8c813c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
@@ -301,17 +302,19 @@ public abstract class INodeReference extends INode {
   }
 
   @Override // used by WithCount
-  public Quota.Counts cleanSubtree(int snapshot, int prior,
-      BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
-    return referred.cleanSubtree(snapshot, prior, collectedBlocks,
+  public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps, int snapshot,
+      int prior, BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes) {
+    return referred.cleanSubtree(bsps, snapshot, prior, collectedBlocks,
         removedINodes);
   }
 
   @Override // used by WithCount
   public void destroyAndCollectBlocks(
+      BlockStoragePolicySuite bsps,
       BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
     if (removeReference(this) <= 0) {
-      referred.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+      referred.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
     }
   }
 
@@ -322,18 +325,19 @@ public abstract class INodeReference extends INode {
   }
 
   @Override
-  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
-      int lastSnapshotId) {
-    return referred.computeQuotaUsage(counts, useCache, lastSnapshotId);
+  public QuotaCounts computeQuotaUsage(
+    BlockStoragePolicySuite bsps,
+    QuotaCounts counts, boolean useCache, int lastSnapshotId) {
+    return referred.computeQuotaUsage(bsps, counts, useCache, lastSnapshotId);
   }
-  
+
   @Override
   public final INodeAttributes getSnapshotINode(int snapshotId) {
     return referred.getSnapshotINode(snapshotId);
   }
 
   @Override
-  public Quota.Counts getQuotaCounts() {
+  public QuotaCounts getQuotaCounts() {
     return referred.getQuotaCounts();
   }
 
@@ -506,15 +510,15 @@ public abstract class INodeReference extends INode {
     public final ContentSummaryComputationContext computeContentSummary(
         ContentSummaryComputationContext summary) {
       //only count diskspace for WithName
-      final Quota.Counts q = Quota.Counts.newInstance();
-      computeQuotaUsage(q, false, lastSnapshotId);
-      summary.getCounts().add(Content.DISKSPACE, q.get(Quota.DISKSPACE));
+      final QuotaCounts q = new QuotaCounts.Builder().build();
+      computeQuotaUsage(summary.getBlockStoragePolicySuite(), q, false, 
lastSnapshotId);
+      summary.getCounts().add(Content.DISKSPACE, q.getDiskSpace());
       return summary;
     }
 
     @Override
-    public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
-        boolean useCache, int lastSnapshotId) {
+    public final QuotaCounts computeQuotaUsage(BlockStoragePolicySuite bsps,
+        QuotaCounts counts, boolean useCache, int lastSnapshotId) {
       // if this.lastSnapshotId < lastSnapshotId, the rename of the referred 
       // node happened before the rename of its ancestor. This should be 
       // impossible since for WithName node we only count its children at the 
@@ -529,12 +533,12 @@ public abstract class INodeReference extends INode {
       // been updated by changes in the current tree.
       int id = lastSnapshotId != Snapshot.CURRENT_STATE_ID ? 
           lastSnapshotId : this.lastSnapshotId;
-      return referred.computeQuotaUsage(counts, false, id);
+      return referred.computeQuotaUsage(bsps, counts, false, id);
     }
     
     @Override
-    public Quota.Counts cleanSubtree(final int snapshot, int prior,
-        final BlocksMapUpdateInfo collectedBlocks,
+    public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps,
+        final int snapshot, int prior, final BlocksMapUpdateInfo 
collectedBlocks,
         final List<INode> removedINodes) {
       // since WithName node resides in deleted list acting as a snapshot copy,
       // the parameter snapshot must be non-null
@@ -547,16 +551,15 @@ public abstract class INodeReference extends INode {
       
       if (prior != Snapshot.NO_SNAPSHOT_ID
           && Snapshot.ID_INTEGER_COMPARATOR.compare(snapshot, prior) <= 0) {
-        return Quota.Counts.newInstance();
+        return new QuotaCounts.Builder().build();
       }
 
-      Quota.Counts counts = getReferredINode().cleanSubtree(snapshot, prior,
+      QuotaCounts counts = getReferredINode().cleanSubtree(bsps, snapshot, 
prior,
           collectedBlocks, removedINodes);
       INodeReference ref = getReferredINode().getParentReference();
       if (ref != null) {
         try {
-          ref.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
-              -counts.get(Quota.DISKSPACE), true);
+          ref.addSpaceConsumed(counts.negation(), true);
         } catch (QuotaExceededException e) {
           Log.warn("Should not have QuotaExceededException");
         }
@@ -567,17 +570,18 @@ public abstract class INodeReference extends INode {
         // in all the nodes existing at the time of the corresponding rename 
op.
         // Thus if we are deleting a snapshot before/at the snapshot 
associated 
         // with lastSnapshotId, we do not need to update the quota upwards.
-        counts = Quota.Counts.newInstance();
+        counts = new QuotaCounts.Builder().build();
       }
       return counts;
     }
     
     @Override
-    public void destroyAndCollectBlocks(BlocksMapUpdateInfo collectedBlocks,
+    public void destroyAndCollectBlocks(BlockStoragePolicySuite bsps,
+        BlocksMapUpdateInfo collectedBlocks,
         final List<INode> removedINodes) {
       int snapshot = getSelfSnapshot();
       if (removeReference(this) <= 0) {
-        getReferredINode().destroyAndCollectBlocks(collectedBlocks,
+        getReferredINode().destroyAndCollectBlocks(bsps, collectedBlocks,
             removedINodes);
       } else {
         int prior = getPriorSnapshot(this);
@@ -597,12 +601,11 @@ public abstract class INodeReference extends INode {
             return;
           }
           try {
-            Quota.Counts counts = referred.cleanSubtree(snapshot, prior,
+            QuotaCounts counts = referred.cleanSubtree(bsps, snapshot, prior,
                 collectedBlocks, removedINodes);
             INodeReference ref = getReferredINode().getParentReference();
             if (ref != null) {
-              ref.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
-                  -counts.get(Quota.DISKSPACE), true);
+              ref.addSpaceConsumed(counts.negation(), true);
             }
           } catch (QuotaExceededException e) {
             LOG.error("should not exceed quota while snapshot deletion", e);
@@ -653,13 +656,13 @@ public abstract class INodeReference extends INode {
     }
     
     @Override
-    public Quota.Counts cleanSubtree(int snapshot, int prior,
+    public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps, int 
snapshot, int prior,
         BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes) {
       if (snapshot == Snapshot.CURRENT_STATE_ID
           && prior == Snapshot.NO_SNAPSHOT_ID) {
-        Quota.Counts counts = Quota.Counts.newInstance();
-        this.computeQuotaUsage(counts, true);
-        destroyAndCollectBlocks(collectedBlocks, removedINodes);
+        QuotaCounts counts = new QuotaCounts.Builder().build();
+        this.computeQuotaUsage(bsps, counts, true);
+        destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
         return counts;
       } else {
         // if prior is NO_SNAPSHOT_ID, we need to check snapshot belonging to 
@@ -673,9 +676,9 @@ public abstract class INodeReference extends INode {
         if (snapshot != Snapshot.CURRENT_STATE_ID
             && prior != Snapshot.NO_SNAPSHOT_ID
             && Snapshot.ID_INTEGER_COMPARATOR.compare(snapshot, prior) <= 0) {
-          return Quota.Counts.newInstance();
+          return new QuotaCounts.Builder().build();
         }
-        return getReferredINode().cleanSubtree(snapshot, prior,
+        return getReferredINode().cleanSubtree(bsps, snapshot, prior,
             collectedBlocks, removedINodes);
       }
     }
@@ -691,10 +694,10 @@ public abstract class INodeReference extends INode {
      * WithName nodes.
      */
     @Override
-    public void destroyAndCollectBlocks(
+    public void destroyAndCollectBlocks(BlockStoragePolicySuite bsps,
         BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
       if (removeReference(this) <= 0) {
-        getReferredINode().destroyAndCollectBlocks(collectedBlocks,
+        getReferredINode().destroyAndCollectBlocks(bsps, collectedBlocks,
             removedINodes);
       } else {
         // we will clean everything, including files, directories, and 
@@ -717,7 +720,7 @@ public abstract class INodeReference extends INode {
           // when calling cleanSubtree of the referred node, since we
           // compute quota usage updates before calling this destroy
           // function, we use true for countDiffChange
-          referred.cleanSubtree(snapshot, prior, collectedBlocks,
+          referred.cleanSubtree(bsps, snapshot, prior, collectedBlocks,
               removedINodes);
         } else if (referred.isDirectory()) {
           // similarly, if referred is a directory, it must be an
@@ -725,7 +728,7 @@ public abstract class INodeReference extends INode {
           INodeDirectory dir = referred.asDirectory();
           Preconditions.checkState(dir.isWithSnapshot());
           try {
-            DirectoryWithSnapshotFeature.destroyDstSubtree(dir, snapshot,
+            DirectoryWithSnapshotFeature.destroyDstSubtree(bsps, dir, snapshot,
                 prior, collectedBlocks, removedINodes);
           } catch (QuotaExceededException e) {
             LOG.error("should not exceed quota while snapshot deletion", e);
@@ -733,7 +736,7 @@ public abstract class INodeReference extends INode {
         }
       }
     }
-    
+
     private int getSelfSnapshot(final int prior) {
       WithCount wc = (WithCount) getReferredINode().asReference();
       INode referred = wc.getReferredINode();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
index 617c99a..ef30ed7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 /**
@@ -72,26 +73,29 @@ public class INodeSymlink extends INodeWithAdditionalFields 
{
   }
   
   @Override
-  public Quota.Counts cleanSubtree(final int snapshotId, int priorSnapshotId,
+  public QuotaCounts cleanSubtree(BlockStoragePolicySuite bsps,
+      final int snapshotId, int priorSnapshotId,
       final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) {
     if (snapshotId == Snapshot.CURRENT_STATE_ID
         && priorSnapshotId == Snapshot.NO_SNAPSHOT_ID) {
-      destroyAndCollectBlocks(collectedBlocks, removedINodes);
+      destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
     }
-    return Quota.Counts.newInstance(1, 0);
+    return new QuotaCounts.Builder().nameCount(1).build();
   }
   
   @Override
-  public void destroyAndCollectBlocks(final BlocksMapUpdateInfo 
collectedBlocks,
+  public void destroyAndCollectBlocks(final BlockStoragePolicySuite bsps,
+      final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) {
     removedINodes.add(this);
   }
 
   @Override
-  public Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean updateCache, int lastSnapshotId) {
-    counts.add(Quota.NAMESPACE, 1);
+  public QuotaCounts computeQuotaUsage(
+      BlockStoragePolicySuite bsps,
+      QuotaCounts counts, boolean useCache, int lastSnapshotId) {
+    counts.addNameSpace(1);
     return counts;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
index 848fa33..d235e2b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
@@ -71,7 +71,8 @@ public class NameNodeLayoutVersion {
     XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"),
     BLOCK_STORAGE_POLICY(-60, "Block Storage policy"),
     TRUNCATE(-61, "Truncate"),
-    APPEND_NEW_BLOCK(-62, "Support appending to new block");
+    APPEND_NEW_BLOCK(-62, "Support appending to new block"),
+    QUOTA_BY_STORAGE_TYPE(-63, "Support quota for specific storage types");
 
     private final FeatureInfo info;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 1c434e8..9d8199a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -68,7 +68,6 @@ import 
org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
-import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.inotify.EventBatch;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AclException;
@@ -137,6 +136,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -1196,11 +1196,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
                        StorageType type)
       throws IOException {
     checkNNStartup();
-    if (type != null) {
-      throw new UnsupportedActionException(
-          "Quota by storage type support is not fully supported by namenode 
yet.");
-    }
-    namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
+    namesystem.setQuota(path, namespaceQuota, diskspaceQuota, type);
   }
   
   @Override // ClientProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
index 7abd017..6121bcb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
@@ -39,7 +39,7 @@ public enum Quota {
     public static Counts newInstance() {
       return newInstance(0, 0);
     }
-    
+
     Counts() {
       super(Quota.class);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/QuotaByStorageTypeEntry.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/QuotaByStorageTypeEntry.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/QuotaByStorageTypeEntry.java
new file mode 100644
index 0000000..d115acc
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/QuotaByStorageTypeEntry.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.hdfs.StorageType;
+
+ public class QuotaByStorageTypeEntry {
+   private StorageType type;
+   private long quota;
+
+   public StorageType getStorageType() {
+     return type;
+   }
+
+   public long getQuota() {
+     return quota;
+   }
+
+   @Override
+   public boolean equals(Object o){
+     if (o == null) {
+       return false;
+     }
+     if (getClass() != o.getClass()) {
+       return false;
+     }
+     QuotaByStorageTypeEntry other = (QuotaByStorageTypeEntry)o;
+     return Objects.equal(type, other.type) && Objects.equal(quota, 
other.quota);
+   }
+
+   @Override
+   public int hashCode() {
+     return Objects.hashCode(type, quota);
+   }
+
+   @Override
+   public String toString() {
+     StringBuilder sb = new StringBuilder();
+     assert (type != null);
+     sb.append(type.toString().toLowerCase());
+     sb.append(':');
+     sb.append(quota);
+     return sb.toString();
+   }
+
+   public static class Builder {
+     private StorageType type;
+     private long quota;
+
+     public Builder setStorageType(StorageType type) {
+       this.type = type;
+       return this;
+     }
+
+     public Builder setQuota(long quota) {
+       this.quota = quota;
+       return this;
+     }
+
+     public QuotaByStorageTypeEntry build() {
+       return new QuotaByStorageTypeEntry(type, quota);
+     }
+   }
+
+   private QuotaByStorageTypeEntry(StorageType type, long quota) {
+     this.type = type;
+     this.quota = quota;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/QuotaCounts.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/QuotaCounts.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/QuotaCounts.java
new file mode 100644
index 0000000..9b306b0
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/QuotaCounts.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.util.EnumCounters;
+
+/**
+ * Counters for namespace, space and storage type quota and usage.
+ */
+public class QuotaCounts {
+
+  private EnumCounters<Quota> nsSpCounts;
+  private EnumCounters<StorageType> typeCounts;
+
+  public static class Builder {
+    private EnumCounters<Quota> nsSpCounts;
+    private EnumCounters<StorageType> typeCounts;
+
+    public Builder() {
+      this.nsSpCounts = new EnumCounters<Quota>(Quota.class);
+      this.typeCounts = new EnumCounters<StorageType>(StorageType.class);
+    }
+
+    public Builder nameCount(long val) {
+      this.nsSpCounts.set(Quota.NAMESPACE, val);
+      return this;
+    }
+
+    public Builder spaceCount(long val) {
+      this.nsSpCounts.set(Quota.DISKSPACE, val);
+      return this;
+    }
+
+    public Builder typeCounts(EnumCounters<StorageType> val) {
+      if (val != null) {
+        this.typeCounts.set(val);
+      }
+      return this;
+    }
+
+    public Builder typeCounts(long val) {
+      this.typeCounts.reset(val);
+      return this;
+    }
+
+    public Builder quotaCount(QuotaCounts that) {
+      this.nsSpCounts.set(that.nsSpCounts);
+      this.typeCounts.set(that.typeCounts);
+      return this;
+    }
+
+    public QuotaCounts build() {
+      return new QuotaCounts(this);
+    }
+  }
+
+  private QuotaCounts(Builder builder) {
+    this.nsSpCounts = builder.nsSpCounts;
+    this.typeCounts = builder.typeCounts;
+  }
+
+  public void add(QuotaCounts that) {
+    this.nsSpCounts.add(that.nsSpCounts);
+    this.typeCounts.add(that.typeCounts);
+  }
+
+  public void subtract(QuotaCounts that) {
+    this.nsSpCounts.subtract(that.nsSpCounts);
+    this.typeCounts.subtract(that.typeCounts);
+  }
+
+  /**
+   * Returns a QuotaCounts whose value is {@code (-this)}.
+   *
+   * @return {@code -this}
+   */
+  public QuotaCounts negation() {
+    QuotaCounts ret = new QuotaCounts.Builder().quotaCount(this).build();
+    ret.nsSpCounts.negation();
+    ret.typeCounts.negation();
+    return ret;
+  }
+
+  public long getNameSpace(){
+    return nsSpCounts.get(Quota.NAMESPACE);
+  }
+
+  public void setNameSpace(long nameSpaceCount) {
+    this.nsSpCounts.set(Quota.NAMESPACE, nameSpaceCount);
+  }
+
+  public void addNameSpace(long nsDelta) {
+    this.nsSpCounts.add(Quota.NAMESPACE, nsDelta);
+  }
+
+  public long getDiskSpace(){
+    return nsSpCounts.get(Quota.DISKSPACE);
+  }
+
+  public void setDiskSpace(long spaceCount) {
+    this.nsSpCounts.set(Quota.DISKSPACE, spaceCount);
+  }
+
+  public void addDiskSpace(long dsDelta) {
+    this.nsSpCounts.add(Quota.DISKSPACE, dsDelta);
+  }
+
+  public EnumCounters<StorageType> getTypeSpaces() {
+    EnumCounters<StorageType> ret =
+        new EnumCounters<StorageType>(StorageType.class);
+    ret.set(typeCounts);
+    return ret;
+  }
+
+  void setTypeSpaces(EnumCounters<StorageType> that) {
+    if (that != null) {
+      this.typeCounts.set(that);
+    }
+  }
+
+  long getTypeSpace(StorageType type) {
+    return this.typeCounts.get(type);
+  }
+
+  void setTypeSpace(StorageType type, long spaceCount) {
+    this.typeCounts.set(type, spaceCount);
+  }
+
+  public void addTypeSpace(StorageType type, long delta) {
+    this.typeCounts.add(type, delta);
+  }
+
+  public void addTypeSpaces(EnumCounters<StorageType> deltas) {
+    this.typeCounts.add(deltas);
+  }
+
+  public boolean anyNsSpCountGreaterOrEqual(long val) {
+    return nsSpCounts.anyGreaterOrEqual(val);
+  }
+
+  public boolean anyTypeCountGreaterOrEqual(long val) {
+    return typeCounts.anyGreaterOrEqual(val);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    } else if (obj == null || !(obj instanceof QuotaCounts)) {
+      return false;
+    }
+    final QuotaCounts that = (QuotaCounts)obj;
+    return this.nsSpCounts.equals(that.nsSpCounts)
+        && this.typeCounts.equals(that.typeCounts);
+  }
+
+  @Override
+  public int hashCode() {
+    assert false : "hashCode not designed";
+    return 42; // any arbitrary constant will do
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java
index 04e4fc9..691d717 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java
@@ -21,10 +21,11 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeAttributes;
-import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
 
 import com.google.common.base.Preconditions;
@@ -114,17 +115,21 @@ abstract class AbstractINodeDiff<N extends INode,
   }
 
   /** Combine the posterior diff and collect blocks for deletion. */
-  abstract Quota.Counts combinePosteriorAndCollectBlocks(final N currentINode,
+  abstract QuotaCounts combinePosteriorAndCollectBlocks(
+      final BlockStoragePolicySuite bsps, final N currentINode,
       final D posterior, final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes);
   
   /**
    * Delete and clear self.
+   * @param bsps The block storage policy suite used to retrieve storage policy
    * @param currentINode The inode where the deletion happens.
    * @param collectedBlocks Used to collect blocks for deletion.
+   * @param removedINodes INodes removed
    * @return quota usage delta
    */
-  abstract Quota.Counts destroyDiffAndCollectBlocks(final N currentINode,
+  abstract QuotaCounts destroyDiffAndCollectBlocks(
+      final BlockStoragePolicySuite bsps, final N currentINode,
       final BlocksMapUpdateInfo collectedBlocks, final List<INode> 
removedINodes);
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
index 8187d0b..5bd4ed5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
@@ -22,10 +22,11 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeAttributes;
-import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
 
 /**
  * A list of snapshot diffs for storing snapshot data.
@@ -66,13 +67,14 @@ abstract class AbstractINodeDiffList<N extends INode,
    * @param collectedBlocks Used to collect information for blocksMap update
    * @return delta in namespace. 
    */
-  public final Quota.Counts deleteSnapshotDiff(final int snapshot,
+  public final QuotaCounts deleteSnapshotDiff(BlockStoragePolicySuite bsps,
+      final int snapshot,
       final int prior, final N currentINode,
       final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) {
     int snapshotIndex = Collections.binarySearch(diffs, snapshot);
     
-    Quota.Counts counts = Quota.Counts.newInstance();
+    QuotaCounts counts = new QuotaCounts.Builder().build();
     D removed = null;
     if (snapshotIndex == 0) {
       if (prior != Snapshot.NO_SNAPSHOT_ID) { // there is still snapshot before
@@ -80,7 +82,7 @@ abstract class AbstractINodeDiffList<N extends INode,
         diffs.get(snapshotIndex).setSnapshotId(prior);
       } else { // there is no snapshot before
         removed = diffs.remove(0);
-        counts.add(removed.destroyDiffAndCollectBlocks(currentINode,
+        counts.add(removed.destroyDiffAndCollectBlocks(bsps, currentINode,
             collectedBlocks, removedINodes));
       }
     } else if (snapshotIndex > 0) {
@@ -95,7 +97,7 @@ abstract class AbstractINodeDiffList<N extends INode,
         }
 
         counts.add(previous.combinePosteriorAndCollectBlocks(
-            currentINode, removed, collectedBlocks, removedINodes));
+            bsps, currentINode, removed, collectedBlocks, removedINodes));
         previous.setPosterior(removed.getPosterior());
         removed.setPosterior(null);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
index 0ed99e6..5168f0b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.namenode.Content;
 import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
 import org.apache.hadoop.hdfs.server.namenode.INode;
@@ -39,7 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
-import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.util.Time;
@@ -202,7 +203,7 @@ public class DirectorySnapshottableFeature extends 
DirectoryWithSnapshotFeature
    * @return The removed snapshot. Null if no snapshot with the given name
    *         exists.
    */
-  public Snapshot removeSnapshot(INodeDirectory snapshotRoot,
+  public Snapshot removeSnapshot(BlockStoragePolicySuite bsps, INodeDirectory 
snapshotRoot,
       String snapshotName, BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) throws SnapshotException {
     final int i = searchSnapshot(DFSUtil.string2Bytes(snapshotName));
@@ -214,14 +215,13 @@ public class DirectorySnapshottableFeature extends 
DirectoryWithSnapshotFeature
       final Snapshot snapshot = snapshotsByNames.get(i);
       int prior = Snapshot.findLatestSnapshot(snapshotRoot, snapshot.getId());
       try {
-        Quota.Counts counts = snapshotRoot.cleanSubtree(snapshot.getId(),
+        QuotaCounts counts = snapshotRoot.cleanSubtree(bsps, snapshot.getId(),
             prior, collectedBlocks, removedINodes);
         INodeDirectory parent = snapshotRoot.getParent();
         if (parent != null) {
           // there will not be any WithName node corresponding to the deleted
           // snapshot, thus only update the quota usage in the current tree
-          parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
-              -counts.get(Quota.DISKSPACE), true);
+          parent.addSpaceConsumed(counts.negation(), true);
         }
       } catch(QuotaExceededException e) {
         INode.LOG.error("BUG: removeSnapshot increases namespace usage.", e);
@@ -233,6 +233,7 @@ public class DirectorySnapshottableFeature extends 
DirectoryWithSnapshotFeature
   }
 
   public ContentSummaryComputationContext computeContentSummary(
+      final BlockStoragePolicySuite bsps,
       final INodeDirectory snapshotRoot,
       final ContentSummaryComputationContext summary) {
     snapshotRoot.computeContentSummary(summary);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
index a0008de..07ff744 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.namenode.AclStorage;
 import org.apache.hadoop.hdfs.server.namenode.Content;
 import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
@@ -38,7 +39,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryAttributes;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
-import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
 import org.apache.hadoop.hdfs.util.Diff;
 import org.apache.hadoop.hdfs.util.Diff.Container;
@@ -94,14 +95,16 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
     }
 
     /** clear the created list */
-    private Quota.Counts destroyCreatedList(final INodeDirectory currentINode,
+    private QuotaCounts destroyCreatedList(
+        final BlockStoragePolicySuite bsps,
+        final INodeDirectory currentINode,
         final BlocksMapUpdateInfo collectedBlocks,
         final List<INode> removedINodes) {
-      Quota.Counts counts = Quota.Counts.newInstance();
+      QuotaCounts counts = new QuotaCounts.Builder().build();
       final List<INode> createdList = getList(ListType.CREATED);
       for (INode c : createdList) {
-        c.computeQuotaUsage(counts, true);
-        c.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+        c.computeQuotaUsage(bsps, counts, true);
+        c.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
         // c should be contained in the children list, remove it
         currentINode.removeChild(c);
       }
@@ -110,14 +113,15 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
     }
 
     /** clear the deleted list */
-    private Quota.Counts destroyDeletedList(
+    private QuotaCounts destroyDeletedList(
+        final BlockStoragePolicySuite bsps,
         final BlocksMapUpdateInfo collectedBlocks,
         final List<INode> removedINodes) {
-      Quota.Counts counts = Quota.Counts.newInstance();
+      QuotaCounts counts = new QuotaCounts.Builder().build();
       final List<INode> deletedList = getList(ListType.DELETED);
       for (INode d : deletedList) {
-        d.computeQuotaUsage(counts, false);
-        d.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+        d.computeQuotaUsage(bsps, counts, false);
+        d.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
       }
       deletedList.clear();
       return counts;
@@ -204,18 +208,19 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
     }
 
     @Override
-    Quota.Counts combinePosteriorAndCollectBlocks(
+    QuotaCounts combinePosteriorAndCollectBlocks(
+        final BlockStoragePolicySuite bsps,
         final INodeDirectory currentDir, final DirectoryDiff posterior,
         final BlocksMapUpdateInfo collectedBlocks,
         final List<INode> removedINodes) {
-      final Quota.Counts counts = Quota.Counts.newInstance();
+      final QuotaCounts counts = new QuotaCounts.Builder().build();
       diff.combinePosterior(posterior.diff, new Diff.Processor<INode>() {
         /** Collect blocks for deleted files. */
         @Override
         public void process(INode inode) {
           if (inode != null) {
-            inode.computeQuotaUsage(counts, false);
-            inode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+            inode.computeQuotaUsage(bsps, counts, false);
+            inode.destroyAndCollectBlocks(bsps, collectedBlocks, 
removedINodes);
           }
         }
       });
@@ -313,11 +318,12 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
     }
 
     @Override
-    Quota.Counts destroyDiffAndCollectBlocks(INodeDirectory currentINode,
+    QuotaCounts destroyDiffAndCollectBlocks(
+        BlockStoragePolicySuite bsps, INodeDirectory currentINode,
         BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
       // this diff has been deleted
-      Quota.Counts counts = Quota.Counts.newInstance();
-      counts.add(diff.destroyDeletedList(collectedBlocks, removedINodes));
+      QuotaCounts counts = new QuotaCounts.Builder().build();
+      counts.add(diff.destroyDeletedList(bsps, collectedBlocks, 
removedINodes));
       INodeDirectoryAttributes snapshotINode = getSnapshotINode();
       if (snapshotINode != null && snapshotINode.getAclFeature() != null) {
         AclStorage.removeAclFeature(snapshotINode.getAclFeature());
@@ -401,7 +407,8 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
   /**
    * Destroy a subtree under a DstReference node.
    */
-  public static void destroyDstSubtree(INode inode, final int snapshot,
+  public static void destroyDstSubtree(
+      final BlockStoragePolicySuite bsps, INode inode, final int snapshot,
       final int prior, final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) throws QuotaExceededException {
     Preconditions.checkArgument(prior != Snapshot.NO_SNAPSHOT_ID);
@@ -410,14 +417,14 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
           && snapshot != Snapshot.CURRENT_STATE_ID) {
         // this inode has been renamed before the deletion of the DstReference
         // subtree
-        inode.cleanSubtree(snapshot, prior, collectedBlocks, removedINodes);
+        inode.cleanSubtree(bsps, snapshot, prior, collectedBlocks, 
removedINodes);
       } else { 
         // for DstReference node, continue this process to its subtree
-        destroyDstSubtree(inode.asReference().getReferredINode(), snapshot,
+        destroyDstSubtree(bsps, inode.asReference().getReferredINode(), 
snapshot,
             prior, collectedBlocks, removedINodes);
       }
     } else if (inode.isFile()) {
-      inode.cleanSubtree(snapshot, prior, collectedBlocks, removedINodes);
+      inode.cleanSubtree(bsps, snapshot, prior, collectedBlocks, 
removedINodes);
     } else if (inode.isDirectory()) {
       Map<INode, INode> excludedNodes = null;
       INodeDirectory dir = inode.asDirectory();
@@ -431,12 +438,12 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
         }
         
         if (snapshot != Snapshot.CURRENT_STATE_ID) {
-          diffList.deleteSnapshotDiff(snapshot, prior, dir, collectedBlocks,
+          diffList.deleteSnapshotDiff(bsps, snapshot, prior, dir, 
collectedBlocks,
               removedINodes);
         }
         priorDiff = diffList.getDiffById(prior);
         if (priorDiff != null && priorDiff.getSnapshotId() == prior) {
-          priorDiff.diff.destroyCreatedList(dir, collectedBlocks,
+          priorDiff.diff.destroyCreatedList(bsps, dir, collectedBlocks,
               removedINodes);
         }
       }
@@ -444,7 +451,7 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
         if (excludedNodes != null && excludedNodes.containsKey(child)) {
           continue;
         }
-        destroyDstSubtree(child, snapshot, prior, collectedBlocks,
+        destroyDstSubtree(bsps, child, snapshot, prior, collectedBlocks,
             removedINodes);
       }
     }
@@ -453,17 +460,19 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
   /**
    * Clean an inode while we move it from the deleted list of post to the
    * deleted list of prior.
+   * @param bsps The block storage policy suite.
    * @param inode The inode to clean.
    * @param post The post snapshot.
    * @param prior The id of the prior snapshot.
    * @param collectedBlocks Used to collect blocks for later deletion.
    * @return Quota usage update.
    */
-  private static Quota.Counts cleanDeletedINode(INode inode,
+  private static QuotaCounts cleanDeletedINode(
+      final BlockStoragePolicySuite bsps, INode inode,
       final int post, final int prior,
       final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) {
-    Quota.Counts counts = Quota.Counts.newInstance();
+    QuotaCounts counts = new QuotaCounts.Builder().build();
     Deque<INode> queue = new ArrayDeque<INode>();
     queue.addLast(inode);
     while (!queue.isEmpty()) {
@@ -471,13 +480,13 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
       if (topNode instanceof INodeReference.WithName) {
         INodeReference.WithName wn = (INodeReference.WithName) topNode;
         if (wn.getLastSnapshotId() >= post) {
-          wn.cleanSubtree(post, prior, collectedBlocks, removedINodes);
+          wn.cleanSubtree(bsps, post, prior, collectedBlocks, removedINodes);
         }
         // For DstReference node, since the node is not in the created list of
         // prior, we should treat it as regular file/dir
       } else if (topNode.isFile() && topNode.asFile().isWithSnapshot()) {
         INodeFile file = topNode.asFile();
-        counts.add(file.getDiffs().deleteSnapshotDiff(post, prior, file,
+        counts.add(file.getDiffs().deleteSnapshotDiff(bsps, post, prior, file,
             collectedBlocks, removedINodes));
       } else if (topNode.isDirectory()) {
         INodeDirectory dir = topNode.asDirectory();
@@ -489,7 +498,7 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
           DirectoryDiff priorDiff = sf.getDiffs().getDiffById(prior);
           if (priorDiff != null && priorDiff.getSnapshotId() == prior) {
             priorChildrenDiff = priorDiff.getChildrenDiff();
-            counts.add(priorChildrenDiff.destroyCreatedList(dir,
+            counts.add(priorChildrenDiff.destroyCreatedList(bsps, dir,
                 collectedBlocks, removedINodes));
           }
         }
@@ -619,27 +628,29 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
     diff.diff.modify(snapshotCopy, child);
     return child;
   }
-  
-  public void clear(INodeDirectory currentINode,
+
+  public void clear(BlockStoragePolicySuite bsps, INodeDirectory currentINode,
       final BlocksMapUpdateInfo collectedBlocks, final List<INode> 
removedINodes) {
     // destroy its diff list
     for (DirectoryDiff diff : diffs) {
-      diff.destroyDiffAndCollectBlocks(currentINode, collectedBlocks,
-          removedINodes);
+      diff.destroyDiffAndCollectBlocks(bsps, currentINode, collectedBlocks,
+        removedINodes);
     }
     diffs.clear();
   }
-  
-  public Quota.Counts computeQuotaUsage4CurrentDirectory(Quota.Counts counts) {
+
+  public QuotaCounts computeQuotaUsage4CurrentDirectory(
+    BlockStoragePolicySuite bsps, QuotaCounts counts) {
     for(DirectoryDiff d : diffs) {
       for(INode deleted : d.getChildrenDiff().getList(ListType.DELETED)) {
-        deleted.computeQuotaUsage(counts, false, Snapshot.CURRENT_STATE_ID);
+        deleted.computeQuotaUsage(bsps, counts, false, 
Snapshot.CURRENT_STATE_ID);
       }
     }
     return counts;
   }
-  
-  public void computeContentSummary4Snapshot(final Content.Counts counts) {
+
+  public void computeContentSummary4Snapshot(final BlockStoragePolicySuite 
bsps,
+      final Content.Counts counts) {
     // Create a new blank summary context for blocking processing of subtree.
     ContentSummaryComputationContext summary = 
         new ContentSummaryComputationContext();
@@ -706,11 +717,11 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
     }
   }
 
-  public Quota.Counts cleanDirectory(final INodeDirectory currentINode,
+  public QuotaCounts cleanDirectory(final BlockStoragePolicySuite bsps, final 
INodeDirectory currentINode,
       final int snapshot, int prior,
       final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) {
-    Quota.Counts counts = Quota.Counts.newInstance();
+    QuotaCounts counts = new QuotaCounts.Builder().build();
     Map<INode, INode> priorCreated = null;
     Map<INode, INode> priorDeleted = null;
     if (snapshot == Snapshot.CURRENT_STATE_ID) { // delete the current 
directory
@@ -718,10 +729,10 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
       // delete everything in created list
       DirectoryDiff lastDiff = diffs.getLast();
       if (lastDiff != null) {
-        counts.add(lastDiff.diff.destroyCreatedList(currentINode,
+        counts.add(lastDiff.diff.destroyCreatedList(bsps, currentINode,
             collectedBlocks, removedINodes));
       }
-      counts.add(currentINode.cleanSubtreeRecursively(snapshot, prior,
+      counts.add(currentINode.cleanSubtreeRecursively(bsps, snapshot, prior,
           collectedBlocks, removedINodes, priorDeleted));
     } else {
       // update prior
@@ -738,9 +749,9 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
         }
       }
       
-      counts.add(getDiffs().deleteSnapshotDiff(snapshot, prior,
+      counts.add(getDiffs().deleteSnapshotDiff(bsps, snapshot, prior,
           currentINode, collectedBlocks, removedINodes));
-      counts.add(currentINode.cleanSubtreeRecursively(snapshot, prior,
+      counts.add(currentINode.cleanSubtreeRecursively(bsps, snapshot, prior,
           collectedBlocks, removedINodes, priorDeleted));
 
       // check priorDiff again since it may be created during the diff deletion
@@ -757,7 +768,7 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
             for (INode cNode : priorDiff.getChildrenDiff().getList(
                 ListType.CREATED)) {
               if (priorCreated.containsKey(cNode)) {
-                counts.add(cNode.cleanSubtree(snapshot, 
Snapshot.NO_SNAPSHOT_ID,
+                counts.add(cNode.cleanSubtree(bsps, snapshot, 
Snapshot.NO_SNAPSHOT_ID,
                     collectedBlocks, removedINodes));
               }
             }
@@ -774,7 +785,7 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
           for (INode dNode : priorDiff.getChildrenDiff().getList(
               ListType.DELETED)) {
             if (priorDeleted == null || !priorDeleted.containsKey(dNode)) {
-              counts.add(cleanDeletedINode(dNode, snapshot, prior,
+              counts.add(cleanDeletedINode(bsps, dNode, snapshot, prior,
                   collectedBlocks, removedINodes));
             }
           }
@@ -784,7 +795,7 @@ public class DirectoryWithSnapshotFeature implements 
INode.Feature {
 
     if (currentINode.isQuotaSet()) {
       currentINode.getDirectoryWithQuotaFeature().addSpaceConsumed2Cache(
-          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+          counts.negation());
     }
     return counts;
   }

Reply via email to