http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
index c493b25..943c898 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
@@ -22,16 +22,12 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -40,20 +36,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.NamespaceNotFoundException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -62,7 +53,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 
@@ -72,11 +62,8 @@ import com.google.common.collect.Multimap;
 @Category(LargeTests.class)
 public class TestQuotaObserverChoreWithMiniCluster {
   private static final Log LOG = 
LogFactory.getLog(TestQuotaObserverChoreWithMiniCluster.class);
-  private static final int SIZE_PER_VALUE = 256;
-  private static final String F1 = "f1";
   private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
   private static final AtomicLong COUNTER = new AtomicLong(0);
-  private static final long ONE_MEGABYTE = 1024L * 1024L;
   private static final long DEFAULT_WAIT_MILLIS = 500;
 
   @Rule
@@ -84,18 +71,19 @@ public class TestQuotaObserverChoreWithMiniCluster {
 
   private HMaster master;
   private QuotaObserverChore chore;
-  private SpaceQuotaViolationNotifierForTest violationNotifier;
+  private SpaceQuotaSnapshotNotifierForTest snapshotNotifier;
+  private SpaceQuotaHelperForTests helper;
 
   @BeforeClass
   public static void setUp() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 
1000);
     conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 
1000);
-    conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_DELAY_KEY, 1000);
-    conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_PERIOD_KEY, 1000);
+    conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000);
+    conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000);
     conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
-    conf.setClass(SpaceQuotaViolationNotifierFactory.VIOLATION_NOTIFIER_KEY,
-        SpaceQuotaViolationNotifierForTest.class, 
SpaceQuotaViolationNotifier.class);
+    conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY,
+        SpaceQuotaSnapshotNotifierForTest.class, 
SpaceQuotaSnapshotNotifier.class);
     TEST_UTIL.startMiniCluster(1);
   }
 
@@ -131,40 +119,55 @@ public class TestQuotaObserverChoreWithMiniCluster {
     }
 
     master = TEST_UTIL.getMiniHBaseCluster().getMaster();
-    violationNotifier =
-        (SpaceQuotaViolationNotifierForTest) 
master.getSpaceQuotaViolationNotifier();
-    violationNotifier.clearTableViolations();
+    snapshotNotifier =
+        (SpaceQuotaSnapshotNotifierForTest) 
master.getSpaceQuotaSnapshotNotifier();
+    snapshotNotifier.clearSnapshots();
     chore = master.getQuotaObserverChore();
+    helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
   }
 
   @Test
   public void testTableViolatesQuota() throws Exception {
-    TableName tn = createTableWithRegions(10);
+    TableName tn = helper.createTableWithRegions(10);
 
-    final long sizeLimit = 2L * ONE_MEGABYTE;
+    final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
     final SpaceViolationPolicy violationPolicy = 
SpaceViolationPolicy.NO_INSERTS;
     QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 
sizeLimit, violationPolicy);
     TEST_UTIL.getAdmin().setQuota(settings);
 
     // Write more data than should be allowed
-    writeData(tn, 3L * ONE_MEGABYTE);
-
-    Map<TableName,SpaceViolationPolicy> violatedQuotas = 
violationNotifier.snapshotTablesInViolation();
-    while (violatedQuotas.isEmpty()) {
-      LOG.info("Found no violated quotas, sleeping and retrying. Current 
reports: "
-          + master.getMasterQuotaManager().snapshotRegionSizes());
-      try {
-        Thread.sleep(DEFAULT_WAIT_MILLIS);
-      } catch (InterruptedException e) {
-        LOG.debug("Interrupted while sleeping.", e);
-        Thread.currentThread().interrupt();
+    helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
+
+    Map<TableName,SpaceQuotaSnapshot> quotaSnapshots = 
snapshotNotifier.copySnapshots();
+    boolean foundSnapshot = false;
+    while (!foundSnapshot) {
+      if (quotaSnapshots.isEmpty()) {
+        LOG.info("Found no violated quotas, sleeping and retrying. Current 
reports: "
+            + master.getMasterQuotaManager().snapshotRegionSizes());
+        sleepWithInterrupt(DEFAULT_WAIT_MILLIS);
+        quotaSnapshots = snapshotNotifier.copySnapshots();
+      } else {
+        Entry<TableName,SpaceQuotaSnapshot> entry = 
Iterables.getOnlyElement(quotaSnapshots.entrySet());
+        assertEquals(tn, entry.getKey());
+        final SpaceQuotaSnapshot snapshot = entry.getValue();
+        if (!snapshot.getQuotaStatus().isInViolation()) {
+          LOG.info("Found a snapshot, but it was not yet in violation. " + 
snapshot);
+          sleepWithInterrupt(DEFAULT_WAIT_MILLIS);
+          quotaSnapshots = snapshotNotifier.copySnapshots();
+        } else {
+          foundSnapshot = true;
+        }
       }
-      violatedQuotas = violationNotifier.snapshotTablesInViolation();
     }
 
-    Entry<TableName,SpaceViolationPolicy> entry = 
Iterables.getOnlyElement(violatedQuotas.entrySet());
+    Entry<TableName,SpaceQuotaSnapshot> entry = 
Iterables.getOnlyElement(quotaSnapshots.entrySet());
     assertEquals(tn, entry.getKey());
-    assertEquals(violationPolicy, entry.getValue());
+    final SpaceQuotaSnapshot snapshot = entry.getValue();
+    assertEquals("Snapshot was " + snapshot, violationPolicy, 
snapshot.getQuotaStatus().getPolicy());
+    assertEquals(sizeLimit, snapshot.getLimit());
+    assertTrue(
+        "The usage should be greater than the limit, but were " + 
snapshot.getUsage() + " and "
+        + snapshot.getLimit() + ", respectively", snapshot.getUsage() > 
snapshot.getLimit());
   }
 
   @Test
@@ -179,18 +182,18 @@ public class TestQuotaObserverChoreWithMiniCluster {
       admin.createNamespace(desc);
     }
 
-    TableName tn1 = createTableWithRegions(namespace, 5);
-    TableName tn2 = createTableWithRegions(namespace, 5);
-    TableName tn3 = createTableWithRegions(namespace, 5);
+    TableName tn1 = helper.createTableWithRegions(namespace, 5);
+    TableName tn2 = helper.createTableWithRegions(namespace, 5);
+    TableName tn3 = helper.createTableWithRegions(namespace, 5);
 
-    final long sizeLimit = 5L * ONE_MEGABYTE;
+    final long sizeLimit = 5L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
     final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE;
     QuotaSettings settings = 
QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy);
     admin.setQuota(settings);
 
-    writeData(tn1, 2L * ONE_MEGABYTE);
+    helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
     admin.flush(tn1);
-    Map<TableName,SpaceViolationPolicy> violatedQuotas = 
violationNotifier.snapshotTablesInViolation();
+    Map<TableName,SpaceQuotaSnapshot> violatedQuotas = 
snapshotNotifier.copySnapshots();
     for (int i = 0; i < 5; i++) {
       // Check a few times to make sure we don't prematurely move to violation
       assertEquals("Should not see any quota violations after writing 2MB of 
data", 0, violatedQuotas.size());
@@ -199,12 +202,12 @@ public class TestQuotaObserverChoreWithMiniCluster {
       } catch (InterruptedException e) {
         LOG.debug("Interrupted while sleeping." , e);
       }
-      violatedQuotas = violationNotifier.snapshotTablesInViolation();
+      violatedQuotas = snapshotNotifier.copySnapshots();
     }
 
-    writeData(tn2, 2L * ONE_MEGABYTE);
+    helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
     admin.flush(tn2);
-    violatedQuotas = violationNotifier.snapshotTablesInViolation();
+    violatedQuotas = snapshotNotifier.copySnapshots();
     for (int i = 0; i < 5; i++) {
       // Check a few times to make sure we don't prematurely move to violation
       assertEquals("Should not see any quota violations after writing 4MB of 
data", 0,
@@ -214,14 +217,14 @@ public class TestQuotaObserverChoreWithMiniCluster {
       } catch (InterruptedException e) {
         LOG.debug("Interrupted while sleeping." , e);
       }
-      violatedQuotas = violationNotifier.snapshotTablesInViolation();
+      violatedQuotas = snapshotNotifier.copySnapshots();
     }
 
     // Writing the final 2MB of data will push the namespace over the 5MB 
limit (6MB in total)
     // and should push all three tables in the namespace into violation.
-    writeData(tn3, 2L * ONE_MEGABYTE);
+    helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
     admin.flush(tn3);
-    violatedQuotas = violationNotifier.snapshotTablesInViolation();
+    violatedQuotas = snapshotNotifier.copySnapshots();
     while (violatedQuotas.size() < 3) {
       LOG.debug("Saw fewer violations than desired (expected 3): " + 
violatedQuotas
           + ". Current reports: " + 
master.getMasterQuotaManager().snapshotRegionSizes());
@@ -231,18 +234,18 @@ public class TestQuotaObserverChoreWithMiniCluster {
         LOG.debug("Interrupted while sleeping.", e);
         Thread.currentThread().interrupt();
       }
-      violatedQuotas = violationNotifier.snapshotTablesInViolation();
+      violatedQuotas = snapshotNotifier.copySnapshots();
     }
 
-    SpaceViolationPolicy vp1 = violatedQuotas.remove(tn1);
-    assertNotNull("tn1 should be in violation", vp1);
-    assertEquals(violationPolicy, vp1);
-    SpaceViolationPolicy vp2 = violatedQuotas.remove(tn2);
-    assertNotNull("tn2 should be in violation", vp2);
-    assertEquals(violationPolicy, vp2);
-    SpaceViolationPolicy vp3 = violatedQuotas.remove(tn3);
-    assertNotNull("tn3 should be in violation", vp3);
-    assertEquals(violationPolicy, vp3);
+    SpaceQuotaSnapshot snapshot1 = violatedQuotas.remove(tn1);
+    assertNotNull("tn1 should be in violation", snapshot1);
+    assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy());
+    SpaceQuotaSnapshot snapshot2 = violatedQuotas.remove(tn2);
+    assertNotNull("tn2 should be in violation", snapshot2);
+    assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy());
+    SpaceQuotaSnapshot snapshot3 = violatedQuotas.remove(tn3);
+    assertNotNull("tn3 should be in violation", snapshot3);
+    assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy());
     assertTrue("Unexpected additional quota violations: " + violatedQuotas, 
violatedQuotas.isEmpty());
   }
 
@@ -258,18 +261,18 @@ public class TestQuotaObserverChoreWithMiniCluster {
       admin.createNamespace(desc);
     }
 
-    TableName tn1 = createTableWithRegions(namespace, 5);
-    TableName tn2 = createTableWithRegions(namespace, 5);
+    TableName tn1 = helper.createTableWithRegions(namespace, 5);
+    TableName tn2 = helper.createTableWithRegions(namespace, 5);
 
-    final long namespaceSizeLimit = 3L * ONE_MEGABYTE;
+    final long namespaceSizeLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
     final SpaceViolationPolicy namespaceViolationPolicy = 
SpaceViolationPolicy.DISABLE;
     QuotaSettings namespaceSettings = 
QuotaSettingsFactory.limitNamespaceSpace(namespace,
         namespaceSizeLimit, namespaceViolationPolicy);
     admin.setQuota(namespaceSettings);
 
-    writeData(tn1, 2L * ONE_MEGABYTE);
+    helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
     admin.flush(tn1);
-    Map<TableName,SpaceViolationPolicy> violatedQuotas = 
violationNotifier.snapshotTablesInViolation();
+    Map<TableName,SpaceQuotaSnapshot> violatedQuotas = 
snapshotNotifier.copySnapshots();
     for (int i = 0; i < 5; i++) {
       // Check a few times to make sure we don't prematurely move to violation
       assertEquals("Should not see any quota violations after writing 2MB of 
data", 0,
@@ -279,12 +282,12 @@ public class TestQuotaObserverChoreWithMiniCluster {
       } catch (InterruptedException e) {
         LOG.debug("Interrupted while sleeping." , e);
       }
-      violatedQuotas = violationNotifier.snapshotTablesInViolation();
+      violatedQuotas = snapshotNotifier.copySnapshots();
     }
 
-    writeData(tn2, 2L * ONE_MEGABYTE);
+    helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
     admin.flush(tn2);
-    violatedQuotas = violationNotifier.snapshotTablesInViolation();
+    violatedQuotas = snapshotNotifier.copySnapshots();
     while (violatedQuotas.size() < 2) {
       LOG.debug("Saw fewer violations than desired (expected 2): " + 
violatedQuotas
           + ". Current reports: " + 
master.getMasterQuotaManager().snapshotRegionSizes());
@@ -294,18 +297,18 @@ public class TestQuotaObserverChoreWithMiniCluster {
         LOG.debug("Interrupted while sleeping.", e);
         Thread.currentThread().interrupt();
       }
-      violatedQuotas = violationNotifier.snapshotTablesInViolation();
+      violatedQuotas = snapshotNotifier.copySnapshots();
     }
 
-    SpaceViolationPolicy actualPolicyTN1 = violatedQuotas.get(tn1);
+    SpaceQuotaSnapshot actualPolicyTN1 = violatedQuotas.get(tn1);
     assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1);
-    assertEquals(namespaceViolationPolicy, actualPolicyTN1);
-    SpaceViolationPolicy actualPolicyTN2 = violatedQuotas.get(tn2);
+    assertEquals(namespaceViolationPolicy, 
actualPolicyTN1.getQuotaStatus().getPolicy());
+    SpaceQuotaSnapshot actualPolicyTN2 = violatedQuotas.get(tn2);
     assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
-    assertEquals(namespaceViolationPolicy, actualPolicyTN2);
+    assertEquals(namespaceViolationPolicy, 
actualPolicyTN2.getQuotaStatus().getPolicy());
 
     // Override the namespace quota with a table quota
-    final long tableSizeLimit = ONE_MEGABYTE;
+    final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE;
     final SpaceViolationPolicy tableViolationPolicy = 
SpaceViolationPolicy.NO_INSERTS;
     QuotaSettings tableSettings = QuotaSettingsFactory.limitTableSpace(tn1, 
tableSizeLimit,
         tableViolationPolicy);
@@ -313,10 +316,10 @@ public class TestQuotaObserverChoreWithMiniCluster {
 
     // Keep checking for the table quota policy to override the namespace quota
     while (true) {
-      violatedQuotas = violationNotifier.snapshotTablesInViolation();
-      SpaceViolationPolicy actualTableViolationPolicy = 
violatedQuotas.get(tn1);
-      assertNotNull("Violation policy should never be null", 
actualTableViolationPolicy);
-      if (tableViolationPolicy != actualTableViolationPolicy) {
+      violatedQuotas = snapshotNotifier.copySnapshots();
+      SpaceQuotaSnapshot actualTableSnapshot = violatedQuotas.get(tn1);
+      assertNotNull("Violation policy should never be null", 
actualTableSnapshot);
+      if (tableViolationPolicy != 
actualTableSnapshot.getQuotaStatus().getPolicy()) {
         LOG.debug("Saw unexpected table violation policy, waiting and 
re-checking.");
         try {
           Thread.sleep(DEFAULT_WAIT_MILLIS);
@@ -326,23 +329,23 @@ public class TestQuotaObserverChoreWithMiniCluster {
         }
         continue;
       }
-      assertEquals(tableViolationPolicy, actualTableViolationPolicy);
+      assertEquals(tableViolationPolicy, 
actualTableSnapshot.getQuotaStatus().getPolicy());
       break;
     }
 
     // This should not change with the introduction of the table quota for tn1
     actualPolicyTN2 = violatedQuotas.get(tn2);
     assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
-    assertEquals(namespaceViolationPolicy, actualPolicyTN2);
+    assertEquals(namespaceViolationPolicy, 
actualPolicyTN2.getQuotaStatus().getPolicy());
   }
 
   @Test
   public void testGetAllTablesWithQuotas() throws Exception {
-    final Multimap<TableName, QuotaSettings> quotas = 
createTablesWithSpaceQuotas();
+    final Multimap<TableName, QuotaSettings> quotas = 
helper.createTablesWithSpaceQuotas();
     Set<TableName> tablesWithQuotas = new HashSet<>();
     Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
     // Partition the tables with quotas by table and ns quota
-    partitionTablesByQuotaTarget(quotas, tablesWithQuotas, 
namespaceTablesWithQuotas);
+    helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, 
namespaceTablesWithQuotas);
 
     TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
     assertEquals("Found tables: " + tables, tablesWithQuotas, 
tables.getTableQuotaTables());
@@ -351,13 +354,13 @@ public class TestQuotaObserverChoreWithMiniCluster {
 
   @Test
   public void testRpcQuotaTablesAreFiltered() throws Exception {
-    final Multimap<TableName, QuotaSettings> quotas = 
createTablesWithSpaceQuotas();
+    final Multimap<TableName, QuotaSettings> quotas = 
helper.createTablesWithSpaceQuotas();
     Set<TableName> tablesWithQuotas = new HashSet<>();
     Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
     // Partition the tables with quotas by table and ns quota
-    partitionTablesByQuotaTarget(quotas, tablesWithQuotas, 
namespaceTablesWithQuotas);
+    helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, 
namespaceTablesWithQuotas);
 
-    TableName rpcQuotaTable = createTable();
+    TableName rpcQuotaTable = helper.createTable();
     TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory
       .throttleTable(rpcQuotaTable, ThrottleType.READ_NUMBER, 6, 
TimeUnit.MINUTES));
 
@@ -375,7 +378,7 @@ public class TestQuotaObserverChoreWithMiniCluster {
     TablesWithQuotas tables = new TablesWithQuotas(TEST_UTIL.getConnection(),
         TEST_UTIL.getConfiguration()) {
       @Override
-      int getNumReportedRegions(TableName table, 
QuotaViolationStore<TableName> tableStore) {
+      int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> 
tableStore) {
         Integer i = mockReportedRegions.get(table);
         if (null == i) {
           return 0;
@@ -385,9 +388,9 @@ public class TestQuotaObserverChoreWithMiniCluster {
     };
 
     // Create the tables
-    TableName tn1 = createTableWithRegions(20);
-    TableName tn2 = createTableWithRegions(20);
-    TableName tn3 = createTableWithRegions(20);
+    TableName tn1 = helper.createTableWithRegions(20);
+    TableName tn2 = helper.createTableWithRegions(20);
+    TableName tn3 = helper.createTableWithRegions(20);
 
     // Add them to the Tables with Quotas object
     tables.addTableQuotaTable(tn1);
@@ -407,9 +410,9 @@ public class TestQuotaObserverChoreWithMiniCluster {
 
   @Test
   public void testFetchSpaceQuota() throws Exception {
-    Multimap<TableName,QuotaSettings> tables = createTablesWithSpaceQuotas();
+    Multimap<TableName,QuotaSettings> tables = 
helper.createTablesWithSpaceQuotas();
     // Can pass in an empty map, we're not consulting it.
-    chore.initializeViolationStores(Collections.emptyMap());
+    chore.initializeSnapshotStores(Collections.emptyMap());
     // All tables that were created should have a quota defined.
     for (Entry<TableName,QuotaSettings> entry : tables.entries()) {
       final TableName table = entry.getKey();
@@ -420,10 +423,10 @@ public class TestQuotaObserverChoreWithMiniCluster {
 
       SpaceQuota spaceQuota = null;
       if (null != qs.getTableName()) {
-        spaceQuota = chore.getTableViolationStore().getSpaceQuota(table);
+        spaceQuota = chore.getTableSnapshotStore().getSpaceQuota(table);
         assertNotNull("Could not find table space quota for " + table, 
spaceQuota);
       } else if (null != qs.getNamespace()) {
-        spaceQuota = 
chore.getNamespaceViolationStore().getSpaceQuota(table.getNamespaceAsString());
+        spaceQuota = 
chore.getNamespaceSnapshotStore().getSpaceQuota(table.getNamespaceAsString());
         assertNotNull("Could not find namespace space quota for " + 
table.getNamespaceAsString(), spaceQuota);
       } else {
         fail("Expected table or namespace space quota");
@@ -433,166 +436,16 @@ public class TestQuotaObserverChoreWithMiniCluster {
       assertEquals(sls.getProto().getQuota(), spaceQuota);
     }
 
-    TableName tableWithoutQuota = createTable();
-    
assertNull(chore.getTableViolationStore().getSpaceQuota(tableWithoutQuota));
+    TableName tableWithoutQuota = helper.createTable();
+    assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota));
   }
 
-  //
-  // Helpers
-  //
-
-  void writeData(TableName tn, long sizeInBytes) throws IOException {
-    final Connection conn = TEST_UTIL.getConnection();
-    final Table table = conn.getTable(tn);
+  private void sleepWithInterrupt(long millis) {
     try {
-      List<Put> updates = new ArrayList<>();
-      long bytesToWrite = sizeInBytes;
-      long rowKeyId = 0L;
-      final StringBuilder sb = new StringBuilder();
-      final Random r = new Random();
-      while (bytesToWrite > 0L) {
-        sb.setLength(0);
-        sb.append(Long.toString(rowKeyId));
-        // Use the reverse counter as the rowKey to get even spread across all 
regions
-        Put p = new Put(Bytes.toBytes(sb.reverse().toString()));
-        byte[] value = new byte[SIZE_PER_VALUE];
-        r.nextBytes(value);
-        p.addColumn(Bytes.toBytes(F1), Bytes.toBytes("q1"), value);
-        updates.add(p);
-
-        // Batch 50K worth of updates
-        if (updates.size() > 50) {
-          table.put(updates);
-          updates.clear();
-        }
-
-        // Just count the value size, ignore the size of rowkey + column
-        bytesToWrite -= SIZE_PER_VALUE;
-        rowKeyId++;
-      }
-
-      // Write the final batch
-      if (!updates.isEmpty()) {
-        table.put(updates);
-      }
-
-      LOG.debug("Data was written to HBase");
-      // Push the data to disk.
-      TEST_UTIL.getAdmin().flush(tn);
-      LOG.debug("Data flushed to disk");
-    } finally {
-      table.close();
-    }
-  }
-
-  Multimap<TableName, QuotaSettings> createTablesWithSpaceQuotas() throws 
Exception {
-    final Admin admin = TEST_UTIL.getAdmin();
-    final Multimap<TableName, QuotaSettings> tablesWithQuotas = 
HashMultimap.create();
-
-    final TableName tn1 = createTable();
-    final TableName tn2 = createTable();
-
-    NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + 
COUNTER.getAndIncrement()).build();
-    admin.createNamespace(nd);
-    final TableName tn3 = createTableInNamespace(nd);
-    final TableName tn4 = createTableInNamespace(nd);
-    final TableName tn5 = createTableInNamespace(nd);
-
-    final long sizeLimit1 = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB
-    final SpaceViolationPolicy violationPolicy1 = 
SpaceViolationPolicy.NO_WRITES;
-    QuotaSettings qs1 = QuotaSettingsFactory.limitTableSpace(tn1, sizeLimit1, 
violationPolicy1);
-    tablesWithQuotas.put(tn1, qs1);
-    admin.setQuota(qs1);
-
-    final long sizeLimit2 = 1024L * 1024L * 1024L * 200L; // 200GB
-    final SpaceViolationPolicy violationPolicy2 = 
SpaceViolationPolicy.NO_WRITES_COMPACTIONS;
-    QuotaSettings qs2 = QuotaSettingsFactory.limitTableSpace(tn2, sizeLimit2, 
violationPolicy2);
-    tablesWithQuotas.put(tn2, qs2);
-    admin.setQuota(qs2);
-
-    final long sizeLimit3 = 1024L * 1024L * 1024L * 1024L * 100L; // 100TB
-    final SpaceViolationPolicy violationPolicy3 = 
SpaceViolationPolicy.NO_INSERTS;
-    QuotaSettings qs3 = QuotaSettingsFactory.limitNamespaceSpace(nd.getName(), 
sizeLimit3, violationPolicy3);
-    tablesWithQuotas.put(tn3, qs3);
-    tablesWithQuotas.put(tn4, qs3);
-    tablesWithQuotas.put(tn5, qs3);
-    admin.setQuota(qs3);
-
-    final long sizeLimit4 = 1024L * 1024L * 1024L * 5L; // 5GB
-    final SpaceViolationPolicy violationPolicy4 = 
SpaceViolationPolicy.NO_INSERTS;
-    QuotaSettings qs4 = QuotaSettingsFactory.limitTableSpace(tn5, sizeLimit4, 
violationPolicy4);
-    // Override the ns quota for tn5, import edge-case to catch table quota 
taking
-    // precedence over ns quota.
-    tablesWithQuotas.put(tn5, qs4);
-    admin.setQuota(qs4);
-
-    return tablesWithQuotas;
-  }
-
-  TableName createTable() throws Exception {
-    return createTableWithRegions(1);
-  }
-
-  TableName createTableWithRegions(int numRegions) throws Exception {
-    return 
createTableWithRegions(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR, 
numRegions);
-  }
-
-  TableName createTableWithRegions(String namespace, int numRegions) throws 
Exception {
-    final Admin admin = TEST_UTIL.getAdmin();
-    final TableName tn = TableName.valueOf(namespace, testName.getMethodName() 
+ COUNTER.getAndIncrement());
-
-    // Delete the old table
-    if (admin.tableExists(tn)) {
-      admin.disableTable(tn);
-      admin.deleteTable(tn);
-    }
-
-    // Create the table
-    HTableDescriptor tableDesc = new HTableDescriptor(tn);
-    tableDesc.addFamily(new HColumnDescriptor(F1));
-    if (numRegions == 1) {
-      admin.createTable(tableDesc);
-    } else {
-      admin.createTable(tableDesc, Bytes.toBytes("0"), Bytes.toBytes("9"), 
numRegions);
-    }
-    return tn;
-  }
-
-  TableName createTableInNamespace(NamespaceDescriptor nd) throws Exception {
-    final Admin admin = TEST_UTIL.getAdmin();
-    final TableName tn = TableName.valueOf(nd.getName(),
-        testName.getMethodName() + COUNTER.getAndIncrement());
-
-    // Delete the old table
-    if (admin.tableExists(tn)) {
-      admin.disableTable(tn);
-      admin.deleteTable(tn);
-    }
-
-    // Create the table
-    HTableDescriptor tableDesc = new HTableDescriptor(tn);
-    tableDesc.addFamily(new HColumnDescriptor(F1));
-
-    admin.createTable(tableDesc);
-    return tn;
-  }
-
-  void partitionTablesByQuotaTarget(Multimap<TableName,QuotaSettings> quotas,
-      Set<TableName> tablesWithTableQuota, Set<TableName> 
tablesWithNamespaceQuota) {
-    // Partition the tables with quotas by table and ns quota
-    for (Entry<TableName, QuotaSettings> entry : quotas.entries()) {
-      SpaceLimitSettings settings = (SpaceLimitSettings) entry.getValue();
-      TableName tn = entry.getKey();
-      if (null != settings.getTableName()) {
-        tablesWithTableQuota.add(tn);
-      }
-      if (null != settings.getNamespace()) {
-        tablesWithNamespaceQuota.add(tn);
-      }
-
-      if (null == settings.getTableName() && null == settings.getNamespace()) {
-        fail("Unexpected table name with null tableName and namespace: " + tn);
-      }
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted while sleeping");
+      Thread.currentThread().interrupt();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
index 55f671a..f10cdef 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
@@ -201,26 +202,29 @@ public class TestQuotaTableUtil {
   @Test
   public void testSerDeViolationPolicies() throws Exception {
     final TableName tn1 = getUniqueTableName();
-    final SpaceViolationPolicy policy1 = SpaceViolationPolicy.DISABLE;
+    final SpaceQuotaSnapshot snapshot1 = new SpaceQuotaSnapshot(
+        new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 512L, 1024L);
     final TableName tn2 = getUniqueTableName();
-    final SpaceViolationPolicy policy2 = SpaceViolationPolicy.NO_INSERTS;
+    final SpaceQuotaSnapshot snapshot2 = new SpaceQuotaSnapshot(
+        new SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 512L, 1024L);
     final TableName tn3 = getUniqueTableName();
-    final SpaceViolationPolicy policy3 = SpaceViolationPolicy.NO_WRITES;
+    final SpaceQuotaSnapshot snapshot3 = new SpaceQuotaSnapshot(
+        new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 512L, 1024L);
     List<Put> puts = new ArrayList<>();
-    puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn1, policy1));
-    puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn2, policy2));
-    puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn3, policy3));
-    final Map<TableName,SpaceViolationPolicy> expectedPolicies = new 
HashMap<>();
-    expectedPolicies.put(tn1, policy1);
-    expectedPolicies.put(tn2, policy2);
-    expectedPolicies.put(tn3, policy3);
-
-    final Map<TableName,SpaceViolationPolicy> actualPolicies = new HashMap<>();
+    puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn1, snapshot1));
+    puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn2, snapshot2));
+    puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn3, snapshot3));
+    final Map<TableName,SpaceQuotaSnapshot> expectedPolicies = new HashMap<>();
+    expectedPolicies.put(tn1, snapshot1);
+    expectedPolicies.put(tn2, snapshot2);
+    expectedPolicies.put(tn3, snapshot3);
+
+    final Map<TableName,SpaceQuotaSnapshot> actualPolicies = new HashMap<>();
     try (Table quotaTable = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
       quotaTable.put(puts);
-      ResultScanner scanner = 
quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan());
+      ResultScanner scanner = 
quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan());
       for (Result r : scanner) {
-        QuotaTableUtil.extractViolationPolicy(r, actualPolicies);
+        QuotaTableUtil.extractQuotaSnapshot(r, actualPolicies);
       }
       scanner.close();
     }
@@ -231,4 +235,4 @@ public class TestQuotaTableUtil {
   private TableName getUniqueTableName() {
     return TableName.valueOf(testName.getMethodName() + "_" + 
tableNameCounter++);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
index e5ab317..38656e8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
@@ -16,33 +16,29 @@
  */
 package org.apache.hadoop.hbase.quotas;
 
-import static org.apache.hadoop.hbase.util.Bytes.toBytes;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
+import 
org.apache.hadoop.hbase.quotas.policies.DisableTableViolationPolicyEnforcement;
+import 
org.apache.hadoop.hbase.quotas.policies.NoInsertsViolationPolicyEnforcement;
+import 
org.apache.hadoop.hbase.quotas.policies.NoWritesCompactionsViolationPolicyEnforcement;
+import 
org.apache.hadoop.hbase.quotas.policies.NoWritesViolationPolicyEnforcement;
+import 
org.apache.hadoop.hbase.quotas.policies.BulkLoadVerifyingViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 /**
  * Test class for {@link RegionServerSpaceQuotaManager}.
@@ -51,77 +47,105 @@ import org.mockito.stubbing.Answer;
 public class TestRegionServerSpaceQuotaManager {
 
   private RegionServerSpaceQuotaManager quotaManager;
-  private Connection conn;
-  private Table quotaTable;
-  private ResultScanner scanner;
+  private RegionServerServices rss;
 
   @Before
-  @SuppressWarnings("unchecked")
   public void setup() throws Exception {
     quotaManager = mock(RegionServerSpaceQuotaManager.class);
-    conn = mock(Connection.class);
-    quotaTable = mock(Table.class);
-    scanner = mock(ResultScanner.class);
-    // Call the real getViolationPoliciesToEnforce()
-    when(quotaManager.getViolationPoliciesToEnforce()).thenCallRealMethod();
-    // Mock out creating a scanner
-    when(quotaManager.getConnection()).thenReturn(conn);
-    when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
-    when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner);
-    // Mock out the static method call with some indirection
-    doAnswer(new Answer<Void>(){
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        Result result = invocation.getArgumentAt(0, Result.class);
-        Map<TableName,SpaceViolationPolicy> policies = 
invocation.getArgumentAt(1, Map.class);
-        QuotaTableUtil.extractViolationPolicy(result, policies);
-        return null;
-      }
-    }).when(quotaManager).extractViolationPolicy(any(Result.class), 
any(Map.class));
+    rss = mock(RegionServerServices.class);
   }
 
   @Test
-  public void testMissingAllColumns() {
-    List<Result> results = new ArrayList<>();
-    results.add(Result.create(Collections.emptyList()));
-    when(scanner.iterator()).thenReturn(results.iterator());
-    try {
-      quotaManager.getViolationPoliciesToEnforce();
-      fail("Expected an IOException, but did not receive one.");
-    } catch (IOException e) {
-      // Expected an error because we had no cells in the row.
-      // This should only happen due to programmer error.
-    }
+  public void testSpacePoliciesFromEnforcements() {
+    final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = new 
HashMap<>();
+    final Map<TableName, SpaceQuotaSnapshot> expectedPolicies = new 
HashMap<>();
+    when(quotaManager.copyActiveEnforcements()).thenReturn(enforcements);
+    when(quotaManager.getActivePoliciesAsMap()).thenCallRealMethod();
+
+    NoInsertsViolationPolicyEnforcement noInsertsPolicy = new 
NoInsertsViolationPolicyEnforcement();
+    SpaceQuotaSnapshot noInsertsSnapshot = new SpaceQuotaSnapshot(
+        new SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 256L, 1024L);
+    noInsertsPolicy.initialize(rss, TableName.valueOf("no_inserts"), 
noInsertsSnapshot);
+    enforcements.put(noInsertsPolicy.getTableName(), noInsertsPolicy);
+    expectedPolicies.put(noInsertsPolicy.getTableName(), noInsertsSnapshot);
+
+    NoWritesViolationPolicyEnforcement noWritesPolicy = new 
NoWritesViolationPolicyEnforcement();
+    SpaceQuotaSnapshot noWritesSnapshot = new SpaceQuotaSnapshot(
+        new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 512L, 2048L);
+    noWritesPolicy.initialize(rss, TableName.valueOf("no_writes"), 
noWritesSnapshot);
+    enforcements.put(noWritesPolicy.getTableName(), noWritesPolicy);
+    expectedPolicies.put(noWritesPolicy.getTableName(), noWritesSnapshot);
+
+    NoWritesCompactionsViolationPolicyEnforcement noWritesCompactionsPolicy =
+        new NoWritesCompactionsViolationPolicyEnforcement();
+    SpaceQuotaSnapshot noWritesCompactionsSnapshot = new SpaceQuotaSnapshot(
+        new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 
1024L, 4096L);
+    noWritesCompactionsPolicy.initialize(
+        rss, TableName.valueOf("no_writes_compactions"), 
noWritesCompactionsSnapshot);
+    enforcements.put(noWritesCompactionsPolicy.getTableName(), 
noWritesCompactionsPolicy);
+    expectedPolicies.put(noWritesCompactionsPolicy.getTableName(),
+        noWritesCompactionsSnapshot);
+
+    DisableTableViolationPolicyEnforcement disablePolicy =
+        new DisableTableViolationPolicyEnforcement();
+    SpaceQuotaSnapshot disableSnapshot = new SpaceQuotaSnapshot(
+        new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 2048L, 8192L);
+    disablePolicy.initialize(rss, TableName.valueOf("disable"), 
disableSnapshot);
+    enforcements.put(disablePolicy.getTableName(), disablePolicy);
+    expectedPolicies.put(disablePolicy.getTableName(), disableSnapshot);
+
+    enforcements.put(
+        TableName.valueOf("no_policy"), new 
BulkLoadVerifyingViolationPolicyEnforcement());
+
+    Map<TableName, SpaceQuotaSnapshot> actualPolicies = 
quotaManager.getActivePoliciesAsMap();
+    assertEquals(expectedPolicies, actualPolicies);
   }
 
   @Test
-  public void testMissingDesiredColumn() {
-    List<Result> results = new ArrayList<>();
-    // Give a column that isn't the one we want
-    Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("q"), 
toBytes("s"), new byte[0]);
-    results.add(Result.create(Collections.singletonList(c)));
-    when(scanner.iterator()).thenReturn(results.iterator());
-    try {
-      quotaManager.getViolationPoliciesToEnforce();
-      fail("Expected an IOException, but did not receive one.");
-    } catch (IOException e) {
-      // Expected an error because we were missing the column we expected in 
this row.
-      // This should only happen due to programmer error.
-    }
+  public void testExceptionOnPolicyEnforcementEnable() throws Exception {
+    final TableName tableName = TableName.valueOf("foo");
+    final SpaceQuotaSnapshot snapshot = new SpaceQuotaSnapshot(
+        new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 2048L);
+    RegionServerServices rss = mock(RegionServerServices.class);
+    SpaceViolationPolicyEnforcementFactory factory = mock(
+        SpaceViolationPolicyEnforcementFactory.class);
+    SpaceViolationPolicyEnforcement enforcement = 
mock(SpaceViolationPolicyEnforcement.class);
+    RegionServerSpaceQuotaManager realManager = new 
RegionServerSpaceQuotaManager(rss, factory);
+
+    when(factory.create(rss, tableName, snapshot)).thenReturn(enforcement);
+    doThrow(new IOException("Failed for test!")).when(enforcement).enable();
+
+    realManager.enforceViolationPolicy(tableName, snapshot);
+    Map<TableName, SpaceViolationPolicyEnforcement> enforcements =
+        realManager.copyActiveEnforcements();
+    assertTrue("Expected active enforcements to be empty, but were " + 
enforcements,
+        enforcements.isEmpty());
   }
 
   @Test
-  public void testParsingError() {
-    List<Result> results = new ArrayList<>();
-    Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("u"), 
toBytes("v"), new byte[0]);
-    results.add(Result.create(Collections.singletonList(c)));
-    when(scanner.iterator()).thenReturn(results.iterator());
-    try {
-      quotaManager.getViolationPoliciesToEnforce();
-      fail("Expected an IOException, but did not receive one.");
-    } catch (IOException e) {
-      // We provided a garbage serialized protobuf message (empty byte array), 
this should
-      // in turn throw an IOException
-    }
+  public void testExceptionOnPolicyEnforcementDisable() throws Exception {
+    final TableName tableName = TableName.valueOf("foo");
+    final SpaceQuotaSnapshot snapshot = new SpaceQuotaSnapshot(
+        new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 2048L);
+    RegionServerServices rss = mock(RegionServerServices.class);
+    SpaceViolationPolicyEnforcementFactory factory = mock(
+        SpaceViolationPolicyEnforcementFactory.class);
+    SpaceViolationPolicyEnforcement enforcement = 
mock(SpaceViolationPolicyEnforcement.class);
+    RegionServerSpaceQuotaManager realManager = new 
RegionServerSpaceQuotaManager(rss, factory);
+
+    when(factory.create(rss, tableName, snapshot)).thenReturn(enforcement);
+    doNothing().when(enforcement).enable();
+    doThrow(new IOException("Failed for test!")).when(enforcement).disable();
+
+    // Enabling should work
+    realManager.enforceViolationPolicy(tableName, snapshot);
+    Map<TableName, SpaceViolationPolicyEnforcement> enforcements =
+        realManager.copyActiveEnforcements();
+    assertEquals(1, enforcements.size());
+
+    // If the disable fails, we should still treat it as "active"
+    realManager.disableViolationPolicyEnforcement(tableName);
+    enforcements = realManager.copyActiveEnforcements();
+    assertEquals(1, enforcements.size());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
index 160de46..7f0f9ad 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
@@ -16,20 +16,34 @@
  */
 package org.apache.hadoop.hbase.quotas;
 
+import static org.apache.hadoop.hbase.util.Bytes.toBytes;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Before;
@@ -37,42 +51,62 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 /**
- * Test class for {@link SpaceQuotaViolationPolicyRefresherChore}.
+ * Test class for {@link SpaceQuotaRefresherChore}.
  */
 @Category(SmallTests.class)
 public class TestSpaceQuotaViolationPolicyRefresherChore {
 
   private RegionServerSpaceQuotaManager manager;
   private RegionServerServices rss;
-  private SpaceQuotaViolationPolicyRefresherChore chore;
+  private SpaceQuotaRefresherChore chore;
   private Configuration conf;
+  private Connection conn;
 
+  @SuppressWarnings("unchecked")
   @Before
-  public void setup() {
+  public void setup() throws IOException {
     conf = HBaseConfiguration.create();
     rss = mock(RegionServerServices.class);
     manager = mock(RegionServerSpaceQuotaManager.class);
+    conn = mock(Connection.class);
     when(manager.getRegionServerServices()).thenReturn(rss);
     when(rss.getConfiguration()).thenReturn(conf);
-    chore = new SpaceQuotaViolationPolicyRefresherChore(manager);
+
+
+    chore = mock(SpaceQuotaRefresherChore.class);
+    when(chore.getConnection()).thenReturn(conn);
+    when(chore.getManager()).thenReturn(manager);
+    doCallRealMethod().when(chore).chore();
+    
when(chore.isInViolation(any(SpaceQuotaSnapshot.class))).thenCallRealMethod();
+    doCallRealMethod().when(chore).extractQuotaSnapshot(any(Result.class), 
any(Map.class));
   }
 
   @Test
   public void testPoliciesAreEnforced() throws IOException {
-    final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new 
HashMap<>();
-    policiesToEnforce.put(TableName.valueOf("table1"), 
SpaceViolationPolicy.DISABLE);
-    policiesToEnforce.put(TableName.valueOf("table2"), 
SpaceViolationPolicy.NO_INSERTS);
-    policiesToEnforce.put(TableName.valueOf("table3"), 
SpaceViolationPolicy.NO_WRITES);
-    policiesToEnforce.put(TableName.valueOf("table4"), 
SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
+    // Create a number of policies that should be enforced (usage > limit)
+    final Map<TableName,SpaceQuotaSnapshot> policiesToEnforce = new 
HashMap<>();
+    policiesToEnforce.put(
+        TableName.valueOf("table1"),
+        new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 512L));
+    policiesToEnforce.put(
+        TableName.valueOf("table2"),
+        new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 2048L, 512L));
+    policiesToEnforce.put(
+        TableName.valueOf("table3"),
+        new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 4096L, 512L));
+    policiesToEnforce.put(
+        TableName.valueOf("table4"),
+        new SpaceQuotaSnapshot(
+            new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 
8192L, 512L));
 
     // No active enforcements
-    
when(manager.getActiveViolationPolicyEnforcements()).thenReturn(Collections.emptyMap());
+    when(manager.copyQuotaSnapshots()).thenReturn(Collections.emptyMap());
     // Policies to enforce
-    
when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+    when(chore.fetchSnapshotsFromQuotaTable()).thenReturn(policiesToEnforce);
 
     chore.chore();
 
-    for (Entry<TableName,SpaceViolationPolicy> entry : 
policiesToEnforce.entrySet()) {
+    for (Entry<TableName,SpaceQuotaSnapshot> entry : 
policiesToEnforce.entrySet()) {
       // Ensure we enforce the policy
       verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
       // Don't disable any policies
@@ -82,50 +116,135 @@ public class TestSpaceQuotaViolationPolicyRefresherChore {
 
   @Test
   public void testOldPoliciesAreRemoved() throws IOException {
-    final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new 
HashMap<>();
-    policiesToEnforce.put(TableName.valueOf("table1"), 
SpaceViolationPolicy.DISABLE);
-    policiesToEnforce.put(TableName.valueOf("table2"), 
SpaceViolationPolicy.NO_INSERTS);
-
-    final Map<TableName,SpaceViolationPolicy> previousPolicies = new 
HashMap<>();
-    previousPolicies.put(TableName.valueOf("table3"), 
SpaceViolationPolicy.NO_WRITES);
-    previousPolicies.put(TableName.valueOf("table4"), 
SpaceViolationPolicy.NO_WRITES);
+    final Map<TableName,SpaceQuotaSnapshot> previousPolicies = new HashMap<>();
+    previousPolicies.put(
+        TableName.valueOf("table3"),
+        new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 4096L, 512L));
+    previousPolicies.put(
+        TableName.valueOf("table4"),
+        new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 8192L, 512L));
+
+    final Map<TableName,SpaceQuotaSnapshot> policiesToEnforce = new 
HashMap<>();
+    policiesToEnforce.put(
+        TableName.valueOf("table1"),
+        new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 512L));
+    policiesToEnforce.put(
+        TableName.valueOf("table2"),
+        new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 2048L, 512L));
+    policiesToEnforce.put(
+        TableName.valueOf("table3"),
+        new SpaceQuotaSnapshot(SpaceQuotaStatus.notInViolation(), 256L, 512L));
+    policiesToEnforce.put(
+        TableName.valueOf("table4"),
+        new SpaceQuotaSnapshot(SpaceQuotaStatus.notInViolation(), 128L, 512L));
 
     // No active enforcements
-    
when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies);
+    when(manager.copyQuotaSnapshots()).thenReturn(previousPolicies);
     // Policies to enforce
-    
when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+    when(chore.fetchSnapshotsFromQuotaTable()).thenReturn(policiesToEnforce);
 
     chore.chore();
 
-    for (Entry<TableName,SpaceViolationPolicy> entry : 
policiesToEnforce.entrySet()) {
-      verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
-    }
+    verify(manager).enforceViolationPolicy(
+        TableName.valueOf("table1"), 
policiesToEnforce.get(TableName.valueOf("table1")));
+    verify(manager).enforceViolationPolicy(
+        TableName.valueOf("table2"), 
policiesToEnforce.get(TableName.valueOf("table2")));
 
-    for (Entry<TableName,SpaceViolationPolicy> entry : 
previousPolicies.entrySet()) {
-      verify(manager).disableViolationPolicyEnforcement(entry.getKey());
-    }
+    
verify(manager).disableViolationPolicyEnforcement(TableName.valueOf("table3"));
+    
verify(manager).disableViolationPolicyEnforcement(TableName.valueOf("table4"));
   }
 
   @Test
   public void testNewPolicyOverridesOld() throws IOException {
-    final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new 
HashMap<>();
-    policiesToEnforce.put(TableName.valueOf("table1"), 
SpaceViolationPolicy.DISABLE);
-    policiesToEnforce.put(TableName.valueOf("table2"), 
SpaceViolationPolicy.NO_WRITES);
-    policiesToEnforce.put(TableName.valueOf("table3"), 
SpaceViolationPolicy.NO_INSERTS);
-
-    final Map<TableName,SpaceViolationPolicy> previousPolicies = new 
HashMap<>();
-    previousPolicies.put(TableName.valueOf("table1"), 
SpaceViolationPolicy.NO_WRITES);
+    final Map<TableName,SpaceQuotaSnapshot> policiesToEnforce = new 
HashMap<>();
+    policiesToEnforce.put(
+        TableName.valueOf("table1"),
+        new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 512L));
+    policiesToEnforce.put(
+        TableName.valueOf("table2"),
+        new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 2048L, 512L));
+    policiesToEnforce.put(
+        TableName.valueOf("table3"),
+        new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 4096L, 512L));
+
+    final Map<TableName,SpaceQuotaSnapshot> previousPolicies = new HashMap<>();
+    previousPolicies.put(
+        TableName.valueOf("table1"),
+        new SpaceQuotaSnapshot(new 
SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 8192L, 512L));
 
     // No active enforcements
-    
when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies);
+    when(manager.getActivePoliciesAsMap()).thenReturn(previousPolicies);
     // Policies to enforce
-    
when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+    when(chore.fetchSnapshotsFromQuotaTable()).thenReturn(policiesToEnforce);
 
     chore.chore();
 
-    for (Entry<TableName,SpaceViolationPolicy> entry : 
policiesToEnforce.entrySet()) {
+    for (Entry<TableName,SpaceQuotaSnapshot> entry : 
policiesToEnforce.entrySet()) {
       verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
     }
     verify(manager, 
never()).disableViolationPolicyEnforcement(TableName.valueOf("table1"));
   }
+
+  @Test
+  public void testMissingAllColumns() throws IOException {
+    when(chore.fetchSnapshotsFromQuotaTable()).thenCallRealMethod();
+    ResultScanner scanner = mock(ResultScanner.class);
+    Table quotaTable = mock(Table.class);
+    when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
+    when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner);
+
+    List<Result> results = new ArrayList<>();
+    results.add(Result.create(Collections.emptyList()));
+    when(scanner.iterator()).thenReturn(results.iterator());
+    try {
+      chore.fetchSnapshotsFromQuotaTable();
+      fail("Expected an IOException, but did not receive one.");
+    } catch (IOException e) {
+      // Expected an error because we had no cells in the row.
+      // This should only happen due to programmer error.
+    }
+  }
+
+  @Test
+  public void testMissingDesiredColumn() throws IOException {
+    when(chore.fetchSnapshotsFromQuotaTable()).thenCallRealMethod();
+    ResultScanner scanner = mock(ResultScanner.class);
+    Table quotaTable = mock(Table.class);
+    when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
+    when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner);
+
+    List<Result> results = new ArrayList<>();
+    // Give a column that isn't the one we want
+    Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("q"), 
toBytes("s"), new byte[0]);
+    results.add(Result.create(Collections.singletonList(c)));
+    when(scanner.iterator()).thenReturn(results.iterator());
+    try {
+      chore.fetchSnapshotsFromQuotaTable();
+      fail("Expected an IOException, but did not receive one.");
+    } catch (IOException e) {
+      // Expected an error because we were missing the column we expected in 
this row.
+      // This should only happen due to programmer error.
+    }
+  }
+
+  @Test
+  public void testParsingError() throws IOException {
+    when(chore.fetchSnapshotsFromQuotaTable()).thenCallRealMethod();
+    ResultScanner scanner = mock(ResultScanner.class);
+    Table quotaTable = mock(Table.class);
+    when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
+    when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner);
+
+    List<Result> results = new ArrayList<>();
+    Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("u"), 
toBytes("v"), new byte[0]);
+    results.add(Result.create(Collections.singletonList(c)));
+    when(scanner.iterator()).thenReturn(results.iterator());
+    try {
+      chore.fetchSnapshotsFromQuotaTable();
+      fail("Expected an IOException, but did not receive one.");
+    } catch (IOException e) {
+      // We provided a garbage serialized protobuf message (empty byte array), 
this should
+      // in turn throw an IOException
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
new file mode 100644
index 0000000..ea7dcf1
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.master.HMaster;
+import 
org.apache.hadoop.hbase.quotas.policies.BulkLoadVerifyingViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * End-to-end test class for filesystem space quotas.
+ */
+@Category(LargeTests.class)
+public class TestSpaceQuotas {
+  private static final Log LOG = LogFactory.getLog(TestSpaceQuotas.class);
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private static final AtomicLong COUNTER = new AtomicLong(0);
+  private static final int NUM_RETRIES = 10;
+
+  @Rule
+  public TestName testName = new TestName();
+  private SpaceQuotaHelperForTests helper;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    // Increase the frequency of some of the chores for responsiveness of the 
test
+    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 
1000);
+    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 
1000);
+    conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000);
+    conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000);
+    conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 
1000);
+    conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 
1000);
+    conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void removeAllQuotas() throws Exception {
+    final Connection conn = TEST_UTIL.getConnection();
+    // Wait for the quota table to be created
+    if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
+      do {
+        LOG.debug("Quota table does not yet exist");
+        Thread.sleep(1000);
+      } while (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME));
+    } else {
+      // Or, clean up any quotas from previous test runs.
+      QuotaRetriever scanner = 
QuotaRetriever.open(TEST_UTIL.getConfiguration());
+      for (QuotaSettings quotaSettings : scanner) {
+        final String namespace = quotaSettings.getNamespace();
+        final TableName tableName = quotaSettings.getTableName();
+        if (null != namespace) {
+          LOG.debug("Deleting quota for namespace: " + namespace);
+          QuotaUtil.deleteNamespaceQuota(conn, namespace);
+        } else {
+          assert null != tableName;
+          LOG.debug("Deleting quota for table: "+ tableName);
+          QuotaUtil.deleteTableQuota(conn, tableName);
+        }
+      }
+    }
+    helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
+  }
+
+  @Test
+  public void testNoInsertsWithPut() throws Exception {
+    Put p = new Put(Bytes.toBytes("to_reject"));
+    p.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 
Bytes.toBytes("reject"));
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, p);
+  }
+
+  @Test
+  public void testNoInsertsWithAppend() throws Exception {
+    Append a = new Append(Bytes.toBytes("to_reject"));
+    a.add(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 
Bytes.toBytes("reject"));
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, a);
+  }
+
+  @Test
+  public void testNoInsertsWithIncrement() throws Exception {
+    Increment i = new Increment(Bytes.toBytes("to_reject"));
+    i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), 
Bytes.toBytes("count"), 0);
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, i);
+  }
+
+  @Test
+  public void testDeletesAfterNoInserts() throws Exception {
+    final TableName tn = writeUntilViolation(SpaceViolationPolicy.NO_INSERTS);
+    // Try a couple of times to verify that the quota never gets enforced, 
same as we
+    // do when we're trying to catch the failure.
+    Delete d = new Delete(Bytes.toBytes("should_not_be_rejected"));
+    for (int i = 0; i < NUM_RETRIES; i++) {
+      try (Table t = TEST_UTIL.getConnection().getTable(tn)) {
+        t.delete(d);
+      }
+    }
+  }
+
+  @Test
+  public void testNoWritesWithPut() throws Exception {
+    Put p = new Put(Bytes.toBytes("to_reject"));
+    p.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 
Bytes.toBytes("reject"));
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p);
+  }
+
+  @Test
+  public void testNoWritesWithAppend() throws Exception {
+    Append a = new Append(Bytes.toBytes("to_reject"));
+    a.add(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 
Bytes.toBytes("reject"));
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, a);
+  }
+
+  @Test
+  public void testNoWritesWithIncrement() throws Exception {
+    Increment i = new Increment(Bytes.toBytes("to_reject"));
+    i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), 
Bytes.toBytes("count"), 0);
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, i);
+  }
+
+  @Test
+  public void testNoWritesWithDelete() throws Exception {
+    Delete d = new Delete(Bytes.toBytes("to_reject"));
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, d);
+  }
+
+  @Test
+  public void testNoCompactions() throws Exception {
+    Put p = new Put(Bytes.toBytes("to_reject"));
+    p.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 
Bytes.toBytes("reject"));
+    final TableName tn = writeUntilViolationAndVerifyViolation(
+        SpaceViolationPolicy.NO_WRITES_COMPACTIONS, p);
+    // We know the policy is active at this point
+
+    // Major compactions should be rejected
+    try {
+      TEST_UTIL.getAdmin().majorCompact(tn);
+      fail("Expected that invoking the compaction should throw an Exception");
+    } catch (DoNotRetryIOException e) {
+      // Expected!
+    }
+    // Minor compactions should also be rejected.
+    try {
+      TEST_UTIL.getAdmin().compact(tn);
+      fail("Expected that invoking the compaction should throw an Exception");
+    } catch (DoNotRetryIOException e) {
+      // Expected!
+    }
+  }
+
+  @Test
+  public void testNoEnableAfterDisablePolicy() throws Exception {
+    Put p = new Put(Bytes.toBytes("to_reject"));
+    p.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 
Bytes.toBytes("reject"));
+    final TableName tn = writeUntilViolation(SpaceViolationPolicy.DISABLE);
+    final Admin admin = TEST_UTIL.getAdmin();
+    // Disabling a table relies on some external action (over the other 
policies), so wait a bit
+    // more than the other tests.
+    for (int i = 0; i < NUM_RETRIES * 2; i++) {
+      if (admin.isTableEnabled(tn)) {
+        LOG.info(tn + " is still enabled, expecting it to be disabled. Will 
wait and re-check.");
+        Thread.sleep(2000);
+      }
+    }
+    assertFalse(tn + " is still enabled but it should be disabled", 
admin.isTableEnabled(tn));
+    try {
+      admin.enableTable(tn);
+    } catch (AccessDeniedException e) {
+      String exceptionContents = StringUtils.stringifyException(e);
+      final String expectedText = "violated space quota";
+      assertTrue("Expected the exception to contain " + expectedText + ", but 
was: "
+          + exceptionContents, exceptionContents.contains(expectedText));
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testNoBulkLoadsWithNoWrites() throws Exception {
+    Put p = new Put(Bytes.toBytes("to_reject"));
+    p.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 
Bytes.toBytes("reject"));
+    TableName tableName = 
writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p);
+
+    // The table is now in violation. Try to do a bulk load
+    ClientServiceCallable<Void> callable = generateFileToLoad(tableName, 1, 
50);
+    RpcRetryingCallerFactory factory = new 
RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
+    RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
+    try {
+      caller.callWithRetries(callable, Integer.MAX_VALUE);
+      fail("Expected the bulk load call to fail!");
+    } catch (SpaceLimitingException e) {
+      // Pass
+      LOG.trace("Caught expected exception", e);
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testAtomicBulkLoadUnderQuota() throws Exception {
+    // Need to verify that if the batch of hfiles cannot be loaded, none are 
loaded.
+    TableName tn = helper.createTableWithRegions(10);
+
+    final long sizeLimit = 50L * SpaceQuotaHelperForTests.ONE_KILOBYTE;
+    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
+        tn, sizeLimit, SpaceViolationPolicy.NO_INSERTS);
+    TEST_UTIL.getAdmin().setQuota(settings);
+
+    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
+    RegionServerSpaceQuotaManager spaceQuotaManager = 
rs.getRegionServerSpaceQuotaManager();
+    Map<TableName,SpaceQuotaSnapshot> snapshots = 
spaceQuotaManager.copyQuotaSnapshots();
+    Map<HRegionInfo,Long> regionSizes = getReportedSizesForTable(tn);
+    while (true) {
+      SpaceQuotaSnapshot snapshot = snapshots.get(tn);
+      if (null != snapshot && snapshot.getLimit() > 0) {
+        break;
+      }
+      LOG.debug(
+          "Snapshot does not yet realize quota limit: " + snapshots + ", 
regionsizes: " +
+          regionSizes);
+      Thread.sleep(3000);
+      snapshots = spaceQuotaManager.copyQuotaSnapshots();
+      regionSizes = getReportedSizesForTable(tn);
+    }
+    // Our quota limit should be reflected in the latest snapshot
+    SpaceQuotaSnapshot snapshot = snapshots.get(tn);
+    assertEquals(0L, snapshot.getUsage());
+    assertEquals(sizeLimit, snapshot.getLimit());
+
+    // We would also not have a "real" policy in violation
+    ActivePolicyEnforcement activePolicies = 
spaceQuotaManager.getActiveEnforcements();
+    SpaceViolationPolicyEnforcement enforcement = 
activePolicies.getPolicyEnforcement(tn);
+    assertTrue(
+        "Expected to find Noop policy, but got " + 
enforcement.getClass().getSimpleName(),
+        enforcement instanceof BulkLoadVerifyingViolationPolicyEnforcement);
+
+    // Should generate two files, each of which is over 25KB each
+    ClientServiceCallable<Void> callable = generateFileToLoad(tn, 2, 500);
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    FileStatus[] files = fs.listStatus(
+        new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
+    for (FileStatus file : files) {
+      assertTrue(
+          "Expected the file, " + file.getPath() + ",  length to be larger 
than 25KB, but was "
+              + file.getLen(),
+          file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE);
+      LOG.debug(file.getPath() + " -> " + file.getLen() +"B");
+    }
+
+    RpcRetryingCallerFactory factory = new 
RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
+    RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
+    try {
+      caller.callWithRetries(callable, Integer.MAX_VALUE);
+      fail("Expected the bulk load call to fail!");
+    } catch (SpaceLimitingException e) {
+      // Pass
+      LOG.trace("Caught expected exception", e);
+    }
+    // Verify that we have no data in the table because neither file should 
have been
+    // loaded even though one of the files could have.
+    Table table = TEST_UTIL.getConnection().getTable(tn);
+    ResultScanner scanner = table.getScanner(new Scan());
+    try {
+      assertNull("Expected no results", scanner.next());
+    } finally{
+      scanner.close();
+    }
+  }
+
+  private Map<HRegionInfo,Long> getReportedSizesForTable(TableName tn) {
+    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+    MasterQuotaManager quotaManager = master.getMasterQuotaManager();
+    Map<HRegionInfo,Long> filteredRegionSizes = new HashMap<>();
+    for (Entry<HRegionInfo,Long> entry : 
quotaManager.snapshotRegionSizes().entrySet()) {
+      if (entry.getKey().getTable().equals(tn)) {
+        filteredRegionSizes.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return filteredRegionSizes;
+  }
+
+  private TableName writeUntilViolation(SpaceViolationPolicy policyToViolate) 
throws Exception {
+    TableName tn = helper.createTableWithRegions(10);
+
+    final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
+    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 
sizeLimit, policyToViolate);
+    TEST_UTIL.getAdmin().setQuota(settings);
+
+    // Write more data than should be allowed and flush it to disk
+    helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
+
+    // This should be sufficient time for the chores to run and see the change.
+    Thread.sleep(5000);
+
+    return tn;
+  }
+
+  private TableName writeUntilViolationAndVerifyViolation(
+      SpaceViolationPolicy policyToViolate, Mutation m) throws Exception {
+    final TableName tn = writeUntilViolation(policyToViolate);
+
+    // But let's try a few times to get the exception before failing
+    boolean sawError = false;
+    for (int i = 0; i < NUM_RETRIES && !sawError; i++) {
+      try (Table table = TEST_UTIL.getConnection().getTable(tn)) {
+        if (m instanceof Put) {
+          table.put((Put) m);
+        } else if (m instanceof Delete) {
+          table.delete((Delete) m);
+        } else if (m instanceof Append) {
+          table.append((Append) m);
+        } else if (m instanceof Increment) {
+          table.increment((Increment) m);
+        } else {
+          fail(
+              "Failed to apply " + m.getClass().getSimpleName() +
+              " to the table. Programming error");
+        }
+        LOG.info("Did not reject the " + m.getClass().getSimpleName() + ", 
will sleep and retry");
+        Thread.sleep(2000);
+      } catch (Exception e) {
+        String msg = StringUtils.stringifyException(e);
+        assertTrue("Expected exception message to contain the word '" + 
policyToViolate.name() +
+            "', but was " + msg, msg.contains(policyToViolate.name()));
+        sawError = true;
+      }
+    }
+    if (!sawError) {
+      try (Table quotaTable = 
TEST_UTIL.getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+        ResultScanner scanner = quotaTable.getScanner(new Scan());
+        Result result = null;
+        LOG.info("Dumping contents of hbase:quota table");
+        while ((result = scanner.next()) != null) {
+          LOG.info(Bytes.toString(result.getRow()) + " => " + 
result.toString());
+        }
+        scanner.close();
+      }
+    }
+    assertTrue(
+        "Expected to see an exception writing data to a table exceeding its 
quota", sawError);
+
+    return tn;
+  }
+
+  private ClientServiceCallable<Void> generateFileToLoad(
+      TableName tn, int numFiles, int numRowsPerFile) throws Exception {
+    Connection conn = TEST_UTIL.getConnection();
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    Path baseDir = new Path(fs.getHomeDirectory(), testName.getMethodName() + 
"_files");
+    fs.mkdirs(baseDir);
+    final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], 
String>>();
+    for (int i = 1; i <= numFiles; i++) {
+      Path hfile = new Path(baseDir, "file" + i);
+      TestHRegionServerBulkLoad.createHFile(
+          fs, hfile, Bytes.toBytes(SpaceQuotaHelperForTests.F1), 
Bytes.toBytes("to"),
+          Bytes.toBytes("reject"), numRowsPerFile);
+      famPaths.add(new Pair<>(Bytes.toBytes(SpaceQuotaHelperForTests.F1), 
hfile.toString()));
+    }
+
+    // bulk load HFiles
+    Table table = conn.getTable(tn);
+    final String bulkToken = new SecureBulkLoadClient(conf, 
table).prepareBulkLoad(conn);
+    return new ClientServiceCallable<Void>(conn,
+        tn, Bytes.toBytes("row"), new 
RpcControllerFactory(conf).newController()) {
+      @Override
+      public Void rpcCall() throws Exception {
+        SecureBulkLoadClient secureClient = null;
+        byte[] regionName = getLocation().getRegionInfo().getRegionName();
+        try (Table table = conn.getTable(getTableName())) {
+          secureClient = new SecureBulkLoadClient(conf, table);
+          secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+                true, null, bulkToken);
+        }
+        return null;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java
index efc046b..cefed67 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
@@ -44,7 +44,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 /**
- * Test class for {@link TableQuotaViolationStore}.
+ * Test class for {@link TableQuotaSnapshotStore}.
  */
 @Category(SmallTests.class)
 public class TestTableQuotaViolationStore {
@@ -53,14 +53,14 @@ public class TestTableQuotaViolationStore {
   private Connection conn;
   private QuotaObserverChore chore;
   private Map<HRegionInfo, Long> regionReports;
-  private TableQuotaViolationStore store;
+  private TableQuotaSnapshotStore store;
 
   @Before
   public void setup() {
     conn = mock(Connection.class);
     chore = mock(QuotaObserverChore.class);
     regionReports = new HashMap<>();
-    store = new TableQuotaViolationStore(conn, chore, regionReports);
+    store = new TableQuotaSnapshotStore(conn, chore, regionReports);
   }
 
   @Test
@@ -108,23 +108,29 @@ public class TestTableQuotaViolationStore {
     regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(0), 
Bytes.toBytes(1)), 1024L * 512L);
     regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(1), 
Bytes.toBytes(2)), 1024L * 256L);
 
+    SpaceQuotaSnapshot tn1Snapshot = new SpaceQuotaSnapshot(
+        SpaceQuotaStatus.notInViolation(), 1024L * 768L, 1024L * 1024L);
+
     // Below the quota
-    assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(tn1, 
quota));
+    assertEquals(tn1Snapshot, store.getTargetState(tn1, quota));
 
     regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(2), 
Bytes.toBytes(3)), 1024L * 256L);
+    tn1Snapshot = new SpaceQuotaSnapshot(SpaceQuotaStatus.notInViolation(), 
1024L * 1024L, 1024L * 1024L);
 
     // Equal to the quota is still in observance
-    assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(tn1, 
quota));
+    assertEquals(tn1Snapshot, store.getTargetState(tn1, quota));
 
     regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(3), 
Bytes.toBytes(4)), 1024L);
+    tn1Snapshot = new SpaceQuotaSnapshot(
+        new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L * 1024L + 
1024L, 1024L * 1024L);
 
     // Exceeds the quota, should be in violation
-    assertEquals(ViolationState.IN_VIOLATION, store.getTargetState(tn1, 
quota));
+    assertEquals(tn1Snapshot, store.getTargetState(tn1, quota));
   }
 
   @Test
   public void testGetSpaceQuota() throws Exception {
-    TableQuotaViolationStore mockStore = mock(TableQuotaViolationStore.class);
+    TableQuotaSnapshotStore mockStore = mock(TableQuotaSnapshotStore.class);
     when(mockStore.getSpaceQuota(any(TableName.class))).thenCallRealMethod();
 
     Quotas quotaWithSpace = Quotas.newBuilder().setSpace(

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java
index 4a7000c..d190c8c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java
@@ -30,12 +30,11 @@ import java.util.Objects;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Before;
@@ -44,17 +43,17 @@ import org.junit.experimental.categories.Category;
 import org.mockito.ArgumentMatcher;
 
 /**
- * Test case for {@link TableSpaceQuotaViolationNotifier}.
+ * Test case for {@link TableSpaceQuotaSnapshotNotifier}.
  */
 @Category(SmallTests.class)
 public class TestTableSpaceQuotaViolationNotifier {
 
-  private TableSpaceQuotaViolationNotifier notifier;
+  private TableSpaceQuotaSnapshotNotifier notifier;
   private Connection conn;
 
   @Before
   public void setup() throws Exception {
-    notifier = new TableSpaceQuotaViolationNotifier();
+    notifier = new TableSpaceQuotaSnapshotNotifier();
     conn = mock(Connection.class);
     notifier.initialize(conn);
   }
@@ -62,35 +61,25 @@ public class TestTableSpaceQuotaViolationNotifier {
   @Test
   public void testToViolation() throws Exception {
     final TableName tn = TableName.valueOf("inviolation");
-    final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS;
+    final SpaceQuotaSnapshot snapshot = new SpaceQuotaSnapshot(
+        new SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 1024L, 512L);
     final Table quotaTable = mock(Table.class);
     
when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
 
     final Put expectedPut = new Put(Bytes.toBytes("t." + 
tn.getNameAsString()));
-    final SpaceQuota protoQuota = SpaceQuota.newBuilder()
-        .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
+    final QuotaProtos.SpaceQuotaSnapshot protoQuota = 
QuotaProtos.SpaceQuotaSnapshot.newBuilder()
+        
.setStatus(QuotaProtos.SpaceQuotaStatus.newBuilder().setInViolation(true).setPolicy(
+            
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy.NO_INSERTS))
+        .setLimit(512L)
+        .setUsage(1024L)
         .build();
-    expectedPut.addColumn(Bytes.toBytes("u"), Bytes.toBytes("v"), 
protoQuota.toByteArray());
+    expectedPut.addColumn(Bytes.toBytes("u"), Bytes.toBytes("p"), 
protoQuota.toByteArray());
 
-    notifier.transitionTableToViolation(tn, policy);
+    notifier.transitionTable(tn, snapshot);
 
     verify(quotaTable).put(argThat(new SingleCellPutMatcher(expectedPut)));
   }
 
-  @Test
-  public void testToObservance() throws Exception {
-    final TableName tn = TableName.valueOf("notinviolation");
-    final Table quotaTable = mock(Table.class);
-    
when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
-
-    final Delete expectedDelete = new Delete(Bytes.toBytes("t." + 
tn.getNameAsString()));
-    expectedDelete.addColumn(Bytes.toBytes("u"), Bytes.toBytes("v"));
-
-    notifier.transitionTableToObservance(tn);
-
-    verify(quotaTable).delete(argThat(new 
SingleCellDeleteMatcher(expectedDelete)));
-  }
-
   /**
    * Parameterized for Puts.
    */
@@ -101,15 +90,6 @@ public class TestTableSpaceQuotaViolationNotifier {
   }
 
   /**
-   * Parameterized for Deletes.
-   */
-  private static class SingleCellDeleteMatcher extends 
SingleCellMutationMatcher<Delete> {
-    private SingleCellDeleteMatcher(Delete expected) {
-      super(expected);
-    }
-  }
-
-  /**
    * Quick hack to verify a Mutation with one column.
    */
   private static class SingleCellMutationMatcher<T> extends ArgumentMatcher<T> 
{

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java
index bb8d5cd..3d0aec6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java
@@ -96,7 +96,7 @@ public class TestTablesWithQuotas {
     final Map<TableName,Integer> reportedRegions = new HashMap<>();
     final Map<TableName,Integer> actualRegions = new HashMap<>();
     final Configuration conf = HBaseConfiguration.create();
-    
conf.setDouble(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY, 
0.95);
+    conf.setDouble(QuotaObserverChore.QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY, 
0.95);
 
     TableName tooFewRegionsTable = TableName.valueOf("tn1");
     TableName sufficientRegionsTable = TableName.valueOf("tn2");
@@ -114,7 +114,7 @@ public class TestTablesWithQuotas {
       }
 
       @Override
-      int getNumReportedRegions(TableName table, 
QuotaViolationStore<TableName> tableStore) {
+      int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> 
tableStore) {
         return reportedRegions.get(table);
       }
     };
@@ -177,13 +177,13 @@ public class TestTablesWithQuotas {
 
     QuotaObserverChore chore = mock(QuotaObserverChore.class);
     Map<HRegionInfo,Long> regionUsage = new HashMap<>();
-    TableQuotaViolationStore store = new TableQuotaViolationStore(conn, chore, 
regionUsage);
+    TableQuotaSnapshotStore store = new TableQuotaSnapshotStore(conn, chore, 
regionUsage);
 
     // A super dirty hack to verify that, after getting no regions for our 
table,
     // we bail out and start processing the next element (which there is none).
     final TablesWithQuotas tables = new TablesWithQuotas(conn, conf) {
       @Override
-      int getNumReportedRegions(TableName table, 
QuotaViolationStore<TableName> tableStore) {
+      int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> 
tableStore) {
         throw new RuntimeException("Should should not reach here");
       }
     };

Reply via email to