http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
index ebe4603..f9fc91c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@ -34,9 +34,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
@@ -66,12 +68,15 @@ import 
org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
 import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields;
+import org.apache.hadoop.hdfs.server.namenode.QuotaByStorageTypeEntry;
 import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceContext;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
 import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
+import org.apache.hadoop.hdfs.util.EnumCounters;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
@@ -343,13 +348,31 @@ public class FSImageFormatPBSnapshot {
 
           long modTime = dirCopyInPb.getModificationTime();
           boolean noQuota = dirCopyInPb.getNsQuota() == -1
-              && dirCopyInPb.getDsQuota() == -1;
-
-          copy = noQuota ? new INodeDirectoryAttributes.SnapshotCopy(name,
-              permission, acl, modTime, xAttrs)
-              : new INodeDirectoryAttributes.CopyWithQuota(name, permission,
-                  acl, modTime, dirCopyInPb.getNsQuota(),
-                  dirCopyInPb.getDsQuota(), xAttrs);
+              && dirCopyInPb.getDsQuota() == -1
+              && (!dirCopyInPb.hasTypeQuotas());
+
+          if (noQuota) {
+            copy = new INodeDirectoryAttributes.SnapshotCopy(name,
+              permission, acl, modTime, xAttrs);
+          } else {
+            EnumCounters<StorageType> typeQuotas = null;
+            if (dirCopyInPb.hasTypeQuotas()) {
+              ImmutableList<QuotaByStorageTypeEntry> qes =
+                  FSImageFormatPBINode.Loader.loadQuotaByStorageTypeEntries(
+                      dirCopyInPb.getTypeQuotas());
+              typeQuotas = new EnumCounters<StorageType>(StorageType.class,
+                  HdfsConstants.QUOTA_RESET);
+              for (QuotaByStorageTypeEntry qe : qes) {
+                if (qe.getQuota() >= 0 && qe.getStorageType() != null &&
+                    qe.getStorageType().supportTypeQuota()) {
+                  typeQuotas.set(qe.getStorageType(), qe.getQuota());
+                }
+              }
+            }
+            copy = new INodeDirectoryAttributes.CopyWithQuota(name, permission,
+                acl, modTime, dirCopyInPb.getNsQuota(),
+                dirCopyInPb.getDsQuota(), typeQuotas, xAttrs);
+          }
         }
         // load created list
         List<INode> clist = loadCreatedList(in, dir,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java
index 10bba67..931f7f0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java
@@ -23,12 +23,13 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
-import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
 
 /**
@@ -80,13 +81,14 @@ public class FileDiff extends
   }
 
   @Override
-  Quota.Counts combinePosteriorAndCollectBlocks(INodeFile currentINode,
+  QuotaCounts combinePosteriorAndCollectBlocks(
+      BlockStoragePolicySuite bsps, INodeFile currentINode,
       FileDiff posterior, BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) {
     FileWithSnapshotFeature sf = currentINode.getFileWithSnapshotFeature();
     assert sf != null : "FileWithSnapshotFeature is null";
     return sf.updateQuotaAndCollectBlocks(
-        currentINode, posterior, collectedBlocks, removedINodes);
+        bsps, currentINode, posterior, collectedBlocks, removedINodes);
   }
   
   @Override
@@ -110,10 +112,10 @@ public class FileDiff extends
   }
 
   @Override
-  Quota.Counts destroyDiffAndCollectBlocks(INodeFile currentINode,
+  QuotaCounts destroyDiffAndCollectBlocks(BlockStoragePolicySuite bsps, 
INodeFile currentINode,
       BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
     return currentINode.getFileWithSnapshotFeature()
-        .updateQuotaAndCollectBlocks(currentINode, this, collectedBlocks,
+        .updateQuotaAndCollectBlocks(bsps, currentINode, this, collectedBlocks,
             removedINodes);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
index 5ea23dc..f9f5056 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@@ -93,7 +94,7 @@ public class FileDiffList extends
    * up to the file length of the latter.
    * Collect unused blocks of the removed snapshot.
    */
-  void combineAndCollectSnapshotBlocks(INodeFile file,
+  void combineAndCollectSnapshotBlocks(BlockStoragePolicySuite bsps, INodeFile 
file,
                                        FileDiff removed,
                                        BlocksMapUpdateInfo collectedBlocks,
                                        List<INode> removedINodes) {
@@ -102,7 +103,7 @@ public class FileDiffList extends
       FileWithSnapshotFeature sf = file.getFileWithSnapshotFeature();
       assert sf != null : "FileWithSnapshotFeature is null";
       if(sf.isCurrentFileDeleted())
-        sf.collectBlocksAndClear(file, collectedBlocks, removedINodes);
+        sf.collectBlocksAndClear(bsps, file, collectedBlocks, removedINodes);
       return;
     }
     int p = getPrior(removed.getSnapshotId(), true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
index 419557b..6bcdbd7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
@@ -21,13 +21,17 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.namenode.AclFeature;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.AclStorage;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
-import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.util.EnumCounters;
 
 /**
  * Feature for file with snapshot-related information.
@@ -114,7 +118,8 @@ public class FileWithSnapshotFeature implements 
INode.Feature {
     return (isCurrentFileDeleted()? "(DELETED), ": ", ") + diffs;
   }
   
-  public Quota.Counts cleanFile(final INodeFile file, final int snapshotId,
+  public QuotaCounts cleanFile(final BlockStoragePolicySuite bsps,
+      final INodeFile file, final int snapshotId,
       int priorSnapshotId, final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) {
     if (snapshotId == Snapshot.CURRENT_STATE_ID) {
@@ -123,11 +128,11 @@ public class FileWithSnapshotFeature implements 
INode.Feature {
         file.recordModification(priorSnapshotId);
         deleteCurrentFile();
       }
-      collectBlocksAndClear(file, collectedBlocks, removedINodes);
-      return Quota.Counts.newInstance();
+      collectBlocksAndClear(bsps, file, collectedBlocks, removedINodes);
+      return new QuotaCounts.Builder().build();
     } else { // delete the snapshot
       priorSnapshotId = getDiffs().updatePrior(snapshotId, priorSnapshotId);
-      return diffs.deleteSnapshotDiff(snapshotId, priorSnapshotId, file,
+      return diffs.deleteSnapshotDiff(bsps, snapshotId, priorSnapshotId, file,
           collectedBlocks, removedINodes);
     }
   }
@@ -136,17 +141,52 @@ public class FileWithSnapshotFeature implements 
INode.Feature {
     this.diffs.clear();
   }
   
-  public Quota.Counts updateQuotaAndCollectBlocks(INodeFile file,
+  public QuotaCounts updateQuotaAndCollectBlocks(BlockStoragePolicySuite bsps, 
INodeFile file,
       FileDiff removed, BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes) {
     long oldDiskspace = file.diskspaceConsumed();
+
+    byte storagePolicyID = file.getStoragePolicyID();
+    BlockStoragePolicy bsp = null;
+    EnumCounters<StorageType> typeSpaces =
+        new EnumCounters<StorageType>(StorageType.class);
+    if (storagePolicyID != BlockStoragePolicySuite.ID_UNSPECIFIED) {
+      bsp = bsps.getPolicy(file.getStoragePolicyID());
+    }
+
     if (removed.snapshotINode != null) {
       short replication = removed.snapshotINode.getFileReplication();
       short currentRepl = file.getBlockReplication();
       if (currentRepl == 0) {
-        oldDiskspace = file.computeFileSize(true, true) * replication;
-      } else if (replication > currentRepl) {  
-        oldDiskspace = oldDiskspace / file.getBlockReplication() * replication;
+        long oldFileSizeNoRep = file.computeFileSize(true, true);
+        oldDiskspace =  oldFileSizeNoRep * replication;
+
+        if (bsp != null) {
+          List<StorageType> oldTypeChosen = 
bsp.chooseStorageTypes(replication);
+          for (StorageType t : oldTypeChosen) {
+            if (t.supportTypeQuota()) {
+              typeSpaces.add(t, -oldFileSizeNoRep);
+            }
+          }
+        }
+      } else if (replication > currentRepl) {
+        long oldFileSizeNoRep = file.diskspaceConsumedNoReplication();
+        oldDiskspace = oldFileSizeNoRep * replication;
+
+        if (bsp != null) {
+          List<StorageType> oldTypeChosen = 
bsp.chooseStorageTypes(replication);
+          for (StorageType t : oldTypeChosen) {
+            if (t.supportTypeQuota()) {
+              typeSpaces.add(t, -oldFileSizeNoRep);
+            }
+          }
+          List<StorageType> newTypeChosen = 
bsp.chooseStorageTypes(currentRepl);
+          for (StorageType t: newTypeChosen) {
+            if (t.supportTypeQuota()) {
+              typeSpaces.add(t, oldFileSizeNoRep);
+            }
+          }
+        }
       }
       AclFeature aclFeature = removed.getSnapshotINode().getAclFeature();
       if (aclFeature != null) {
@@ -155,21 +195,24 @@ public class FileWithSnapshotFeature implements 
INode.Feature {
     }
 
     getDiffs().combineAndCollectSnapshotBlocks(
-        file, removed, collectedBlocks, removedINodes);
+        bsps, file, removed, collectedBlocks, removedINodes);
 
     long dsDelta = oldDiskspace - file.diskspaceConsumed();
-    return Quota.Counts.newInstance(0, dsDelta);
+    return new QuotaCounts.Builder().
+        spaceCount(dsDelta).
+        typeCounts(typeSpaces).
+        build();
   }
 
   /**
    * If some blocks at the end of the block list no longer belongs to
    * any inode, collect them and update the block list.
    */
-  public void collectBlocksAndClear(final INodeFile file,
+  public void collectBlocksAndClear(final BlockStoragePolicySuite bsps, final 
INodeFile file,
       final BlocksMapUpdateInfo info, final List<INode> removedINodes) {
     // check if everything is deleted.
     if (isCurrentFileDeleted() && getDiffs().asList().isEmpty()) {
-      file.destroyAndCollectBlocks(info, removedINodes);
+      file.destroyAndCollectBlocks(bsps, info, removedINodes);
       return;
     }
     // find max file size.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
index ba7c43c..c3b7523 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
@@ -234,7 +234,8 @@ public class SnapshotManager implements SnapshotStatsMXBean 
{
       BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
       throws IOException {
     INodeDirectory srcRoot = getSnapshottableRoot(iip);
-    srcRoot.removeSnapshot(snapshotName, collectedBlocks, removedINodes);
+    srcRoot.removeSnapshot(fsdir.getBlockStoragePolicySuite(), snapshotName,
+        collectedBlocks, removedINodes);
     numSnapshots.getAndDecrement();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
index 8a8e61f..18f4bd6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
@@ -50,6 +50,14 @@ public class EnumCounters<E extends Enum<E>> {
     this.enumClass = enumClass;
     this.counters = new long[enumConstants.length];
   }
+
+  public EnumCounters(final Class<E> enumClass, long defaultVal) {
+    final E[] enumConstants = enumClass.getEnumConstants();
+    Preconditions.checkNotNull(enumConstants);
+    this.enumClass = enumClass;
+    this.counters = new long[enumConstants.length];
+    reset(defaultVal);
+  }
   
   /** @return the value of counter e. */
   public final long get(final E e) {
@@ -77,9 +85,7 @@ public class EnumCounters<E extends Enum<E>> {
 
   /** Reset all counters to zero. */
   public final void reset() {
-    for(int i = 0; i < counters.length; i++) {
-      this.counters[i] = 0L;
-    }
+    reset(0L);
   }
 
   /** Add the given value to counter e. */
@@ -143,6 +149,30 @@ public class EnumCounters<E extends Enum<E>> {
     return b.substring(0, b.length() - 2);
   }
 
+  public final void reset(long val) {
+    for(int i = 0; i < counters.length; i++) {
+      this.counters[i] = val;
+    }
+  }
+
+  public boolean allLessOrEqual(long val) {
+    for (long c : counters) {
+      if (c > val) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public boolean anyGreaterOrEqual(long val) {
+    for (long c: counters) {
+      if (c >= val) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * A factory for creating counters.
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
index 643a034..3bd1d91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
@@ -141,6 +141,15 @@ message INodeSection {
     optional uint32 storagePolicyID = 10;
   }
 
+  message QuotaByStorageTypeEntryProto {
+    required StorageTypeProto storageType = 1;
+    required uint64 quota = 2;
+  }
+
+  message QuotaByStorageTypeFeatureProto {
+    repeated QuotaByStorageTypeEntryProto quotas = 1;
+  }
+
   message INodeDirectory {
     optional uint64 modificationTime = 1;
     // namespace quota
@@ -150,6 +159,7 @@ message INodeSection {
     optional fixed64 permission = 4;
     optional AclFeatureProto acl = 5;
     optional XAttrFeatureProto xAttrs = 6;
+    optional QuotaByStorageTypeFeatureProto typeQuotas = 7;
   }
 
   message INodeSymlink {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index e8fc662..f47bd24 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1184,6 +1184,8 @@ public class DFSTestUtil {
     // OP_SET_QUOTA 14
     filesystem.setQuota(pathDirectoryMkdir, 1000L, 
         HdfsConstants.QUOTA_DONT_SET);
+    // OP_SET_QUOTA_BY_STORAGETYPE
+    filesystem.setQuotaByStorageType(pathDirectoryMkdir, StorageType.SSD, 
888L);
     // OP_RENAME 15
     fc.rename(pathFileCreate, pathFileMoved, Rename.NONE);
     // OP_CONCAT_DELETE 16

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
index 4f449d1..6e91e06 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
@@ -72,7 +72,7 @@ public class TestDFSInotifyEventInputStream {
    */
   @Test
   public void testOpcodeCount() {
-    Assert.assertEquals(49, FSEditLogOpCodes.values().length);
+    Assert.assertEquals(50, FSEditLogOpCodes.values().length);
   }
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
index d3f122f..445c1e1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
@@ -554,7 +554,7 @@ public abstract class FSImageTestUtil {
    * get NameSpace quota.
    */
   public static long getNSQuota(FSNamesystem ns) {
-    return ns.dir.rootDir.getQuotaCounts().get(Quota.NAMESPACE);
+    return ns.dir.rootDir.getQuotaCounts().getNameSpace();
   }
   
   public static void assertNNFilesMatch(MiniDFSCluster cluster) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java
index fad8807..ede2f89 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java
@@ -81,10 +81,10 @@ public class TestDiskspaceQuotaUpdate {
     INode fnode = fsdir.getINode4Write(foo.toString());
     assertTrue(fnode.isDirectory());
     assertTrue(fnode.isQuotaSet());
-    Quota.Counts cnt = fnode.asDirectory().getDirectoryWithQuotaFeature()
+    QuotaCounts cnt = fnode.asDirectory().getDirectoryWithQuotaFeature()
         .getSpaceConsumed();
-    assertEquals(2, cnt.get(Quota.NAMESPACE));
-    assertEquals(fileLen * REPLICATION, cnt.get(Quota.DISKSPACE));
+    assertEquals(2, cnt.getNameSpace());
+    assertEquals(fileLen * REPLICATION, cnt.getDiskSpace());
   }
 
   /**
@@ -105,10 +105,10 @@ public class TestDiskspaceQuotaUpdate {
 
     INodeDirectory fooNode = 
fsdir.getINode4Write(foo.toString()).asDirectory();
     assertTrue(fooNode.isQuotaSet());
-    Quota.Counts quota = fooNode.getDirectoryWithQuotaFeature()
+    QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
         .getSpaceConsumed();
-    long ns = quota.get(Quota.NAMESPACE);
-    long ds = quota.get(Quota.DISKSPACE);
+    long ns = quota.getNameSpace();
+    long ds = quota.getDiskSpace();
     assertEquals(2, ns); // foo and bar
     assertEquals(currentFileLen * REPLICATION, ds);
     ContentSummary c = dfs.getContentSummary(foo);
@@ -119,8 +119,8 @@ public class TestDiskspaceQuotaUpdate {
     currentFileLen += BLOCKSIZE;
 
     quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
-    ns = quota.get(Quota.NAMESPACE);
-    ds = quota.get(Quota.DISKSPACE);
+    ns = quota.getNameSpace();
+    ds = quota.getDiskSpace();
     assertEquals(2, ns); // foo and bar
     assertEquals(currentFileLen * REPLICATION, ds);
     c = dfs.getContentSummary(foo);
@@ -131,8 +131,8 @@ public class TestDiskspaceQuotaUpdate {
     currentFileLen += (BLOCKSIZE * 3 + BLOCKSIZE / 8);
 
     quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
-    ns = quota.get(Quota.NAMESPACE);
-    ds = quota.get(Quota.DISKSPACE);
+    ns = quota.getNameSpace();
+    ds = quota.getDiskSpace();
     assertEquals(2, ns); // foo and bar
     assertEquals(currentFileLen * REPLICATION, ds);
     c = dfs.getContentSummary(foo);
@@ -156,10 +156,10 @@ public class TestDiskspaceQuotaUpdate {
         .of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
 
     INodeDirectory fooNode = 
fsdir.getINode4Write(foo.toString()).asDirectory();
-    Quota.Counts quota = fooNode.getDirectoryWithQuotaFeature()
+    QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
         .getSpaceConsumed();
-    long ns = quota.get(Quota.NAMESPACE);
-    long ds = quota.get(Quota.DISKSPACE);
+    long ns = quota.getNameSpace();
+    long ds = quota.getDiskSpace();
     assertEquals(2, ns); // foo and bar
     assertEquals(BLOCKSIZE * 2 * REPLICATION, ds); // file is under 
construction
 
@@ -168,8 +168,8 @@ public class TestDiskspaceQuotaUpdate {
 
     fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
     quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
-    ns = quota.get(Quota.NAMESPACE);
-    ds = quota.get(Quota.DISKSPACE);
+    ns = quota.getNameSpace();
+    ds = quota.getDiskSpace();
     assertEquals(2, ns);
     assertEquals((BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION, ds);
 
@@ -177,8 +177,8 @@ public class TestDiskspaceQuotaUpdate {
     DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE);
 
     quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
-    ns = quota.get(Quota.NAMESPACE);
-    ds = quota.get(Quota.DISKSPACE);
+    ns = quota.getNameSpace();
+    ds = quota.getDiskSpace();
     assertEquals(2, ns); // foo and bar
     assertEquals((BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION, ds);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
index 8b829e4..741dd10 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
@@ -158,7 +158,7 @@ public class TestFSImageWithSnapshot {
     fsn.getFSDirectory().writeLock();
     try {
       loader.load(imageFile, false);
-      FSImage.updateCountForQuota(
+      
FSImage.updateCountForQuota(fsn.getBlockManager().getStoragePolicySuite(),
           INodeDirectory.valueOf(fsn.getFSDirectory().getINode("/"), "/"));
     } finally {
       fsn.getFSDirectory().writeUnlock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaByStorageType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaByStorageType.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaByStorageType.java
new file mode 100644
index 0000000..b0d5d87
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaByStorageType.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+  import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+  import static org.junit.Assert.assertEquals;
+  import static org.junit.Assert.assertFalse;
+  import static org.junit.Assert.assertTrue;
+  import static org.junit.Assert.fail;
+
+  import org.apache.commons.logging.Log;
+  import org.apache.commons.logging.LogFactory;
+  import org.apache.hadoop.conf.Configuration;
+  import org.apache.hadoop.fs.Path;
+  import org.apache.hadoop.hdfs.*;
+  import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+  import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
+  import org.junit.After;
+  import org.junit.Before;
+  import org.junit.Test;
+
+public class TestQuotaByStorageType {
+
+  private static final int BLOCKSIZE = 1024;
+  private static final short REPLICATION = 3;
+  static final long seed = 0L;
+  private static final Path dir = new Path("/TestQuotaByStorageType");
+
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private FSDirectory fsdir;
+  private DistributedFileSystem dfs;
+  private FSNamesystem fsn;
+
+  protected static final Log LOG = 
LogFactory.getLog(TestQuotaByStorageType.class);
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
+
+    // Setup a 3-node cluster and configure
+    // each node with 1 SSD and 1 DISK without capacity limitation
+    cluster = new MiniDFSCluster
+        .Builder(conf)
+        .numDataNodes(REPLICATION)
+        .storageTypes(new StorageType[]{StorageType.SSD, DEFAULT})
+        .build();
+    cluster.waitActive();
+
+    fsdir = cluster.getNamesystem().getFSDirectory();
+    dfs = cluster.getFileSystem();
+    fsn = cluster.getNamesystem();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeWithFileCreateOneSSD() throws Exception {
+    testQuotaByStorageTypeWithFileCreateCase(
+        HdfsConstants.ONESSD_STORAGE_POLICY_NAME,
+        StorageType.SSD,
+        (short)1);
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeWithFileCreateAllSSD() throws Exception {
+    testQuotaByStorageTypeWithFileCreateCase(
+        HdfsConstants.ALLSSD_STORAGE_POLICY_NAME,
+        StorageType.SSD,
+        (short)3);
+  }
+
+  void testQuotaByStorageTypeWithFileCreateCase(
+      String storagePolicy, StorageType storageType, short replication) throws 
Exception {
+    final Path foo = new Path(dir, "foo");
+    Path createdFile1 = new Path(foo, "created_file1.data");
+    dfs.mkdirs(foo);
+
+    // set storage policy on directory "foo" to storagePolicy
+    dfs.setStoragePolicy(foo, storagePolicy);
+
+    // set quota by storage type on directory "foo"
+    dfs.setQuotaByStorageType(foo, storageType, BLOCKSIZE * 10);
+
+    INode fnode = fsdir.getINode4Write(foo.toString());
+    assertTrue(fnode.isDirectory());
+    assertTrue(fnode.isQuotaSet());
+
+    // Create file of size 2 * BLOCKSIZE under directory "foo"
+    long file1Len = BLOCKSIZE * 2 + BLOCKSIZE / 2;
+    int bufLen = BLOCKSIZE / 16;
+    DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE, 
REPLICATION, seed);
+
+    // Verify space consumed and remaining quota
+    long storageTypeConsumed = 
fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(storageType);
+    assertEquals(file1Len * replication, storageTypeConsumed);
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeWithFileCreateAppend() throws Exception {
+    final Path foo = new Path(dir, "foo");
+    Path createdFile1 = new Path(foo, "created_file1.data");
+    dfs.mkdirs(foo);
+
+    // set storage policy on directory "foo" to ONESSD
+    dfs.setStoragePolicy(foo, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+
+    // set quota by storage type on directory "foo"
+    dfs.setQuotaByStorageType(foo, StorageType.SSD, BLOCKSIZE * 4);
+    INode fnode = fsdir.getINode4Write(foo.toString());
+    assertTrue(fnode.isDirectory());
+    assertTrue(fnode.isQuotaSet());
+
+    // Create file of size 2 * BLOCKSIZE under directory "foo"
+    long file1Len = BLOCKSIZE * 2;
+    int bufLen = BLOCKSIZE / 16;
+    DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE, 
REPLICATION, seed);
+
+    // Verify space consumed and remaining quota
+    long ssdConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    assertEquals(file1Len, ssdConsumed);
+
+    // append several blocks
+    int appendLen = BLOCKSIZE * 2;
+    DFSTestUtil.appendFile(dfs, createdFile1, appendLen);
+    file1Len += appendLen;
+
+    ssdConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    assertEquals(file1Len, ssdConsumed);
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeWithFileCreateDelete() throws Exception {
+    final Path foo = new Path(dir, "foo");
+    Path createdFile1 = new Path(foo, "created_file1.data");
+    dfs.mkdirs(foo);
+
+    dfs.setStoragePolicy(foo, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+
+    // set quota by storage type on directory "foo"
+    dfs.setQuotaByStorageType(foo, StorageType.SSD, BLOCKSIZE * 10);
+    INode fnode = fsdir.getINode4Write(foo.toString());
+    assertTrue(fnode.isDirectory());
+    assertTrue(fnode.isQuotaSet());
+
+    // Create file of size 2.5 * BLOCKSIZE under directory "foo"
+    long file1Len = BLOCKSIZE * 2 + BLOCKSIZE / 2;
+    int bufLen = BLOCKSIZE / 16;
+    DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE, 
REPLICATION, seed);
+
+    // Verify space consumed and remaining quota
+    long storageTypeConsumed = 
fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    assertEquals(file1Len, storageTypeConsumed);
+
+    // Delete file and verify the consumed space of the storage type is updated
+    dfs.delete(createdFile1, false);
+    storageTypeConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    assertEquals(0, storageTypeConsumed);
+
+    QuotaCounts counts = new QuotaCounts.Builder().build();
+    fnode.computeQuotaUsage(fsn.getBlockManager().getStoragePolicySuite(), 
counts, true);
+    assertEquals(fnode.dumpTreeRecursively().toString(), 0,
+        counts.getTypeSpaces().get(StorageType.SSD));
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeWithFileCreateRename() throws Exception {
+    final Path foo = new Path(dir, "foo");
+    dfs.mkdirs(foo);
+    Path createdFile1foo = new Path(foo, "created_file1.data");
+
+    final Path bar = new Path(dir, "bar");
+    dfs.mkdirs(bar);
+    Path createdFile1bar = new Path(bar, "created_file1.data");
+
+    // set storage policy on directory "foo" and "bar" to ONESSD
+    dfs.setStoragePolicy(foo, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+    dfs.setStoragePolicy(bar, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+
+    // set quota by storage type on directory "foo"
+    dfs.setQuotaByStorageType(foo, StorageType.SSD, BLOCKSIZE * 4);
+    dfs.setQuotaByStorageType(bar, StorageType.SSD, BLOCKSIZE * 2);
+
+    INode fnode = fsdir.getINode4Write(foo.toString());
+    assertTrue(fnode.isDirectory());
+    assertTrue(fnode.isQuotaSet());
+
+    // Create file of size 3 * BLOCKSIZE under directory "foo"
+    long file1Len = BLOCKSIZE * 3;
+    int bufLen = BLOCKSIZE / 16;
+    DFSTestUtil.createFile(dfs, createdFile1foo, bufLen, file1Len, BLOCKSIZE, 
REPLICATION, seed);
+
+    // Verify space consumed and remaining quota
+    long ssdConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    ;
+    assertEquals(file1Len, ssdConsumed);
+
+    // move file from foo to bar
+    try {
+      dfs.rename(createdFile1foo, createdFile1bar);
+      fail("Should have failed with QuotaByStorageTypeExceededException ");
+    } catch (Throwable t) {
+      LOG.info("Got expected exception ", t);
+    }
+  }
+
+  /**
+   * Test if the quota can be correctly updated for create file even
+   * QuotaByStorageTypeExceededException is thrown
+   */
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeExceptionWithFileCreate() throws Exception 
{
+    final Path foo = new Path(dir, "foo");
+    Path createdFile1 = new Path(foo, "created_file1.data");
+    dfs.mkdirs(foo);
+
+    dfs.setStoragePolicy(foo, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+    dfs.setQuotaByStorageType(foo, StorageType.SSD, BLOCKSIZE * 4);
+
+    INode fnode = fsdir.getINode4Write(foo.toString());
+    assertTrue(fnode.isDirectory());
+    assertTrue(fnode.isQuotaSet());
+
+    // Create the 1st file of size 2 * BLOCKSIZE under directory "foo" and 
expect no exception
+    long file1Len = BLOCKSIZE * 2;
+    int bufLen = BLOCKSIZE / 16;
+    DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE, 
REPLICATION, seed);
+    long currentSSDConsumed = 
fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    assertEquals(file1Len, currentSSDConsumed);
+
+    // Create the 2nd file of size 1.5 * BLOCKSIZE under directory "foo" and 
expect no exception
+    Path createdFile2 = new Path(foo, "created_file2.data");
+    long file2Len = BLOCKSIZE + BLOCKSIZE / 2;
+    DFSTestUtil.createFile(dfs, createdFile2, bufLen, file2Len, BLOCKSIZE, 
REPLICATION, seed);
+    currentSSDConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+
+    assertEquals(file1Len + file2Len, currentSSDConsumed);
+
+    // Create the 3rd file of size BLOCKSIZE under directory "foo" and expect 
quota exceeded exception
+    Path createdFile3 = new Path(foo, "created_file3.data");
+    long file3Len = BLOCKSIZE;
+
+    try {
+      DFSTestUtil.createFile(dfs, createdFile3, bufLen, file3Len, BLOCKSIZE, 
REPLICATION, seed);
+      fail("Should have failed with QuotaByStorageTypeExceededException ");
+    } catch (Throwable t) {
+      LOG.info("Got expected exception ", t);
+
+      currentSSDConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
+          .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+      assertEquals(file1Len + file2Len, currentSSDConsumed);
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeParentOffChildOff() throws Exception {
+    final Path parent = new Path(dir, "parent");
+    final Path child = new Path(parent, "child");
+    dfs.mkdirs(parent);
+    dfs.mkdirs(child);
+
+    dfs.setStoragePolicy(parent, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+
+    // Create file of size 2.5 * BLOCKSIZE under child directory.
+    // Since both parent and child directory do not have SSD quota set,
+    // expect succeed without exception
+    Path createdFile1 = new Path(child, "created_file1.data");
+    long file1Len = BLOCKSIZE * 2 + BLOCKSIZE / 2;
+    int bufLen = BLOCKSIZE / 16;
+    DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE,
+        REPLICATION, seed);
+
+    // Verify SSD usage at the root level as both parent/child don't have 
DirectoryWithQuotaFeature
+    INode fnode = fsdir.getINode4Write("/");
+    long ssdConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    assertEquals(file1Len, ssdConsumed);
+
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeParentOffChildOn() throws Exception {
+    final Path parent = new Path(dir, "parent");
+    final Path child = new Path(parent, "child");
+    dfs.mkdirs(parent);
+    dfs.mkdirs(child);
+
+    dfs.setStoragePolicy(parent, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+    dfs.setQuotaByStorageType(child, StorageType.SSD, 2 * BLOCKSIZE);
+
+    // Create file of size 2.5 * BLOCKSIZE under child directory
+    // Since child directory have SSD quota of 2 * BLOCKSIZE,
+    // expect an exception when creating files under child directory.
+    Path createdFile1 = new Path(child, "created_file1.data");
+    long file1Len = BLOCKSIZE * 2 + BLOCKSIZE / 2;
+    int bufLen = BLOCKSIZE / 16;
+    try {
+      DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE,
+          REPLICATION, seed);
+      fail("Should have failed with QuotaByStorageTypeExceededException ");
+    } catch (Throwable t) {
+      LOG.info("Got expected exception ", t);
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeParentOnChildOff() throws Exception {
+    short replication = 1;
+    final Path parent = new Path(dir, "parent");
+    final Path child = new Path(parent, "child");
+    dfs.mkdirs(parent);
+    dfs.mkdirs(child);
+
+    dfs.setStoragePolicy(parent, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+    dfs.setQuotaByStorageType(parent, StorageType.SSD, 3 * BLOCKSIZE);
+
+    // Create file of size 2.5 * BLOCKSIZE under child directory
+    // Verify parent Quota applies
+    Path createdFile1 = new Path(child, "created_file1.data");
+    long file1Len = BLOCKSIZE * 2 + BLOCKSIZE / 2;
+    int bufLen = BLOCKSIZE / 16;
+    DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE,
+        replication, seed);
+
+    INode fnode = fsdir.getINode4Write(parent.toString());
+    assertTrue(fnode.isDirectory());
+    assertTrue(fnode.isQuotaSet());
+    long currentSSDConsumed = 
fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    ;
+    assertEquals(file1Len, currentSSDConsumed);
+
+    // Create the 2nd file of size BLOCKSIZE under child directory and expect 
quota exceeded exception
+    Path createdFile2 = new Path(child, "created_file2.data");
+    long file2Len = BLOCKSIZE;
+
+    try {
+      DFSTestUtil.createFile(dfs, createdFile2, bufLen, file2Len, BLOCKSIZE, 
replication, seed);
+      fail("Should have failed with QuotaByStorageTypeExceededException ");
+    } catch (Throwable t) {
+      LOG.info("Got expected exception ", t);
+      currentSSDConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
+          .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+      assertEquals(file1Len, currentSSDConsumed);
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeParentOnChildOn() throws Exception {
+    final Path parent = new Path(dir, "parent");
+    final Path child = new Path(parent, "child");
+    dfs.mkdirs(parent);
+    dfs.mkdirs(child);
+
+    dfs.setStoragePolicy(parent, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+    dfs.setQuotaByStorageType(parent, StorageType.SSD, 2 * BLOCKSIZE);
+    dfs.setQuotaByStorageType(child, StorageType.SSD, 3 * BLOCKSIZE);
+
+    // Create file of size 2.5 * BLOCKSIZE under child directory
+    // Verify parent Quota applies
+    Path createdFile1 = new Path(child, "created_file1.data");
+    long file1Len = BLOCKSIZE * 2 + BLOCKSIZE / 2;
+    int bufLen = BLOCKSIZE / 16;
+    try {
+      DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE,
+          REPLICATION, seed);
+      fail("Should have failed with QuotaByStorageTypeExceededException ");
+    } catch (Throwable t) {
+      LOG.info("Got expected exception ", t);
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeWithTraditionalQuota() throws Exception {
+    final Path foo = new Path(dir, "foo");
+    dfs.mkdirs(foo);
+    dfs.setQuota(foo, Long.MAX_VALUE - 1, REPLICATION * BLOCKSIZE * 10);
+    INode fnode = fsdir.getINode4Write(foo.toString());
+    assertTrue(fnode.isDirectory());
+    assertTrue(fnode.isQuotaSet());
+
+    Path createdFile = new Path(foo, "created_file.data");
+    long fileLen = BLOCKSIZE * 2 + BLOCKSIZE / 2;
+    DFSTestUtil.createFile(dfs, createdFile, BLOCKSIZE / 16,
+        fileLen, BLOCKSIZE, REPLICATION, seed);
+
+    QuotaCounts cnt = fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed();
+    assertEquals(2, cnt.getNameSpace());
+    assertEquals(fileLen * REPLICATION, cnt.getDiskSpace());
+
+    dfs.delete(createdFile, true);
+
+    QuotaCounts cntAfterDelete = 
fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed();
+    assertEquals(1, cntAfterDelete.getNameSpace());
+    assertEquals(0, cntAfterDelete.getDiskSpace());
+
+    // Validate the computeQuotaUsage()
+    QuotaCounts counts = new QuotaCounts.Builder().build();
+    fnode.computeQuotaUsage(fsn.getBlockManager().getStoragePolicySuite(), 
counts, true);
+    assertEquals(fnode.dumpTreeRecursively().toString(), 1,
+        counts.getNameSpace());
+    assertEquals(fnode.dumpTreeRecursively().toString(), 0,
+        counts.getDiskSpace());
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeWithSnapshot() throws Exception {
+    final Path sub1 = new Path(dir, "Sub1");
+    dfs.mkdirs(sub1);
+
+    // Setup ONE_SSD policy and SSD quota of 4 * BLOCKSIZE on sub1
+    dfs.setStoragePolicy(sub1, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+    dfs.setQuotaByStorageType(sub1, StorageType.SSD, 4 * BLOCKSIZE);
+
+    INode sub1Node = fsdir.getINode4Write(sub1.toString());
+    assertTrue(sub1Node.isDirectory());
+    assertTrue(sub1Node.isQuotaSet());
+
+    // Create file1 of size 2 * BLOCKSIZE under sub1
+    Path file1 = new Path(sub1, "file1");
+    long file1Len = 2 * BLOCKSIZE;
+    DFSTestUtil.createFile(dfs, file1, file1Len, REPLICATION, seed);
+
+    // Create snapshot on sub1 named s1
+    SnapshotTestHelper.createSnapshot(dfs, sub1, "s1");
+
+    // Verify sub1 SSD usage is unchanged after creating snapshot s1
+    long ssdConsumed = sub1Node.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    assertEquals(file1Len, ssdConsumed);
+
+    // Delete file1
+    dfs.delete(file1, false);
+
+    // Verify sub1 SSD usage is unchanged due to the existence of snapshot s1
+    ssdConsumed = sub1Node.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    assertEquals(file1Len, ssdConsumed);
+
+    QuotaCounts counts1 = new QuotaCounts.Builder().build();
+    sub1Node.computeQuotaUsage(fsn.getBlockManager().getStoragePolicySuite(), 
counts1, true);
+    assertEquals(sub1Node.dumpTreeRecursively().toString(), file1Len,
+        counts1.getTypeSpaces().get(StorageType.SSD));
+
+    // Delete the snapshot s1
+    dfs.deleteSnapshot(sub1, "s1");
+
+    // Verify sub1 SSD usage is fully reclaimed and changed to 0
+    ssdConsumed = sub1Node.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    assertEquals(0, ssdConsumed);
+
+    QuotaCounts counts2 = new QuotaCounts.Builder().build();
+    sub1Node.computeQuotaUsage(fsn.getBlockManager().getStoragePolicySuite(), 
counts2, true);
+    assertEquals(sub1Node.dumpTreeRecursively().toString(), 0,
+        counts2.getTypeSpaces().get(StorageType.SSD));
+  }
+
+  @Test(timeout = 60000)
+  public void testQuotaByStorageTypeWithFileCreateTruncate() throws Exception {
+    final Path foo = new Path(dir, "foo");
+    Path createdFile1 = new Path(foo, "created_file1.data");
+    dfs.mkdirs(foo);
+
+    // set storage policy on directory "foo" to ONESSD
+    dfs.setStoragePolicy(foo, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+
+    // set quota by storage type on directory "foo"
+    dfs.setQuotaByStorageType(foo, StorageType.SSD, BLOCKSIZE * 4);
+    INode fnode = fsdir.getINode4Write(foo.toString());
+    assertTrue(fnode.isDirectory());
+    assertTrue(fnode.isQuotaSet());
+
+    // Create file of size 2 * BLOCKSIZE under directory "foo"
+    long file1Len = BLOCKSIZE * 2;
+    int bufLen = BLOCKSIZE / 16;
+    DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE, 
REPLICATION, seed);
+
+    // Verify SSD consumed before truncate
+    long ssdConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    assertEquals(file1Len, ssdConsumed);
+
+    // Truncate file to 1 * BLOCKSIZE
+    int newFile1Len = BLOCKSIZE * 1;
+    dfs.truncate(createdFile1, newFile1Len);
+
+    // Verify SSD consumed after truncate
+    ssdConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
+    assertEquals(newFile1Len, ssdConsumed);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
index 207a4e8..9edeafd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
@@ -63,7 +63,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
-import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.ChildrenDiff;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
@@ -1198,15 +1198,15 @@ public class TestRenameWithSnapshots {
     assertFalse(hdfs.exists(bar_s2));
     restartClusterAndCheckImage(true);
     // make sure the whole referred subtree has been destroyed
-    Quota.Counts q = 
fsdir.getRoot().getDirectoryWithQuotaFeature().getSpaceConsumed();  
-    assertEquals(3, q.get(Quota.NAMESPACE));
-    assertEquals(0, q.get(Quota.DISKSPACE));
+    QuotaCounts q = 
fsdir.getRoot().getDirectoryWithQuotaFeature().getSpaceConsumed();
+    assertEquals(3, q.getNameSpace());
+    assertEquals(0, q.getDiskSpace());
     
     hdfs.deleteSnapshot(sdir1, "s1");
     restartClusterAndCheckImage(true);
-    q = fsdir.getRoot().getDirectoryWithQuotaFeature().getSpaceConsumed();  
-    assertEquals(3, q.get(Quota.NAMESPACE));
-    assertEquals(0, q.get(Quota.DISKSPACE));
+    q = fsdir.getRoot().getDirectoryWithQuotaFeature().getSpaceConsumed();
+    assertEquals(3, q.getNameSpace());
+    assertEquals(0, q.getDiskSpace());
   }
   
   /**
@@ -1600,9 +1600,9 @@ public class TestRenameWithSnapshots {
     // check dir2
     INodeDirectory dir2Node = 
fsdir2.getINode4Write(dir2.toString()).asDirectory();
     assertTrue(dir2Node.isSnapshottable());
-    Quota.Counts counts = dir2Node.computeQuotaUsage();
-    assertEquals(2, counts.get(Quota.NAMESPACE));
-    assertEquals(0, counts.get(Quota.DISKSPACE));
+    QuotaCounts counts = 
dir2Node.computeQuotaUsage(fsdir.getBlockStoragePolicySuite());
+    assertEquals(2, counts.getNameSpace());
+    assertEquals(0, counts.getDiskSpace());
     childrenList = ReadOnlyList.Util.asList(dir2Node.asDirectory()
         .getChildrenList(Snapshot.CURRENT_STATE_ID));
     assertEquals(1, childrenList.size());
@@ -1674,9 +1674,9 @@ public class TestRenameWithSnapshots {
     // check dir2
     INodeDirectory dir2Node = 
fsdir2.getINode4Write(dir2.toString()).asDirectory();
     assertTrue(dir2Node.isSnapshottable());
-    Quota.Counts counts = dir2Node.computeQuotaUsage();
-    assertEquals(3, counts.get(Quota.NAMESPACE));
-    assertEquals(0, counts.get(Quota.DISKSPACE));
+    QuotaCounts counts = 
dir2Node.computeQuotaUsage(fsdir.getBlockStoragePolicySuite());
+    assertEquals(3, counts.getNameSpace());
+    assertEquals(0, counts.getDiskSpace());
     childrenList = ReadOnlyList.Util.asList(dir2Node.asDirectory()
         .getChildrenList(Snapshot.CURRENT_STATE_ID));
     assertEquals(1, childrenList.size());
@@ -1790,9 +1790,10 @@ public class TestRenameWithSnapshots {
     // check dir2
     INode dir2Node = fsdir.getINode4Write(dir2.toString());
     assertTrue(dir2Node.asDirectory().isSnapshottable());
-    Quota.Counts counts = dir2Node.computeQuotaUsage();
-    assertEquals(4, counts.get(Quota.NAMESPACE));
-    assertEquals(BLOCKSIZE * REPL * 2, counts.get(Quota.DISKSPACE));
+    QuotaCounts counts = dir2Node.computeQuotaUsage(
+        fsdir.getBlockStoragePolicySuite());
+    assertEquals(4, counts.getNameSpace());
+    assertEquals(BLOCKSIZE * REPL * 2, counts.getDiskSpace());
   }
   
   @Test
@@ -1958,12 +1959,12 @@ public class TestRenameWithSnapshots {
     // check
     final INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
         .asDirectory();
-    Quota.Counts q1 = 
dir1Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
-    assertEquals(3, q1.get(Quota.NAMESPACE));
+    QuotaCounts q1 = 
dir1Node.getDirectoryWithQuotaFeature().getSpaceConsumed();
+    assertEquals(3, q1.getNameSpace());
     final INodeDirectory dir2Node = fsdir.getINode4Write(sdir2.toString())
         .asDirectory();
-    Quota.Counts q2 = 
dir2Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
-    assertEquals(1, q2.get(Quota.NAMESPACE));
+    QuotaCounts q2 = 
dir2Node.getDirectoryWithQuotaFeature().getSpaceConsumed();
+    assertEquals(1, q2.getNameSpace());
     
     final Path foo_s1 = SnapshotTestHelper.getSnapshotPath(sdir1, "s1",
         foo.getName());
@@ -2028,12 +2029,12 @@ public class TestRenameWithSnapshots {
     final INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
         .asDirectory();
     // sdir1 + s1 + foo_s1 (foo) + foo (foo + s1 + bar~bar3)
-    Quota.Counts q1 = 
dir1Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
-    assertEquals(7, q1.get(Quota.NAMESPACE));
+    QuotaCounts q1 = 
dir1Node.getDirectoryWithQuotaFeature().getSpaceConsumed();
+    assertEquals(7, q1.getNameSpace());
     final INodeDirectory dir2Node = fsdir.getINode4Write(sdir2.toString())
         .asDirectory();
-    Quota.Counts q2 = 
dir2Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
-    assertEquals(1, q2.get(Quota.NAMESPACE));
+    QuotaCounts q2 = 
dir2Node.getDirectoryWithQuotaFeature().getSpaceConsumed();
+    assertEquals(1, q2.getNameSpace());
     
     final Path foo_s1 = SnapshotTestHelper.getSnapshotPath(sdir1, "s1",
         foo.getName());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
index bc4ec90..c494322 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -203,17 +203,17 @@ public class TestSnapshotDeletion {
       final long expectedNs, final long expectedDs) throws IOException {
     INodeDirectory dirNode = getDir(fsdir, dirPath);
     assertTrue(dirNode.isQuotaSet());
-    Quota.Counts q = dirNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
+    QuotaCounts q = dirNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
     assertEquals(dirNode.dumpTreeRecursively().toString(), expectedNs,
-        q.get(Quota.NAMESPACE));
+        q.getNameSpace());
     assertEquals(dirNode.dumpTreeRecursively().toString(), expectedDs,
-        q.get(Quota.DISKSPACE));
-    Quota.Counts counts = Quota.Counts.newInstance();
-    dirNode.computeQuotaUsage(counts, false);
+        q.getDiskSpace());
+    QuotaCounts counts = new QuotaCounts.Builder().build();
+    dirNode.computeQuotaUsage(fsdir.getBlockStoragePolicySuite(), counts, 
false);
     assertEquals(dirNode.dumpTreeRecursively().toString(), expectedNs,
-        counts.get(Quota.NAMESPACE));
+        counts.getNameSpace());
     assertEquals(dirNode.dumpTreeRecursively().toString(), expectedDs,
-        counts.get(Quota.DISKSPACE));
+        counts.getDiskSpace());
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
index da8c190..a32cd98 100644
Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored 
and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored differ

Reply via email to