HBASE-16998 Implement Master-side analysis of region space reports

Adds a new Chore to the Master that analyzes the reports that are
sent by RegionServers. The Master must then, for all tables with
quotas, determine the tables that are violating quotas and move
those tables into violation. Similarly, tables no longer violating
the quota can be moved out of violation.

The Chore is the "stateful" bit, managing which tables are and
are not in violation. Everything else is just performing
computation and informing the Chore on the updated state.

Added InterfaceAudience annotations and clean up the QuotaObserverChore
constructor. Cleaned up some javadoc and QuotaObserverChore. Reuse
the QuotaViolationStore impl objects.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a19a95df
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a19a95df
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a19a95df

Branch: refs/heads/HBASE-16961
Commit: a19a95dff66fc205efb032185944d910c8ea79b2
Parents: d9ebc1e
Author: Josh Elser <els...@apache.org>
Authored: Tue Nov 8 18:55:12 2016 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Mon Mar 20 17:46:40 2017 -0400

----------------------------------------------------------------------
 .../hadoop/hbase/quotas/QuotaRetriever.java     |  29 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |  20 +
 .../hadoop/hbase/quotas/MasterQuotaManager.java |   1 +
 .../quotas/NamespaceQuotaViolationStore.java    | 127 ++++
 .../hadoop/hbase/quotas/QuotaObserverChore.java | 618 +++++++++++++++++++
 .../hbase/quotas/QuotaViolationStore.java       |  89 +++
 .../quotas/SpaceQuotaViolationNotifier.java     |  44 ++
 .../SpaceQuotaViolationNotifierForTest.java     |  50 ++
 .../hbase/quotas/TableQuotaViolationStore.java  | 127 ++++
 .../TestNamespaceQuotaViolationStore.java       | 156 +++++
 .../hbase/quotas/TestQuotaObserverChore.java    | 106 ++++
 .../TestQuotaObserverChoreWithMiniCluster.java  | 596 ++++++++++++++++++
 .../hadoop/hbase/quotas/TestQuotaTableUtil.java |   4 -
 .../quotas/TestTableQuotaViolationStore.java    | 151 +++++
 .../hbase/quotas/TestTablesWithQuotas.java      | 198 ++++++
 15 files changed, 2306 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a19a95df/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java
index fecd2d1..8cd5cf0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.Objects;
 import java.util.Queue;
 
 import org.apache.commons.logging.Log;
@@ -56,11 +57,23 @@ public class QuotaRetriever implements Closeable, 
Iterable<QuotaSettings> {
   private Connection connection;
   private Table table;
 
-  private QuotaRetriever() {
+  /**
+   * Should QutoaRetriever manage the state of the connection, or leave it be.
+   */
+  private boolean isManagedConnection = false;
+
+  QuotaRetriever() {
   }
 
   void init(final Configuration conf, final Scan scan) throws IOException {
-    this.connection = ConnectionFactory.createConnection(conf);
+    // Set this before creating the connection and passing it down to make sure
+    // it's cleaned up if we fail to construct the Scanner.
+    this.isManagedConnection = true;
+    init(ConnectionFactory.createConnection(conf), scan);
+  }
+
+  void init(final Connection conn, final Scan scan) throws IOException {
+    this.connection = Objects.requireNonNull(conn);
     this.table = this.connection.getTable(QuotaTableUtil.QUOTA_TABLE_NAME);
     try {
       scanner = table.getScanner(scan);
@@ -79,10 +92,14 @@ public class QuotaRetriever implements Closeable, 
Iterable<QuotaSettings> {
       this.table.close();
       this.table = null;
     }
-    if (this.connection != null) {
-      this.connection.close();
-      this.connection = null;
+    // Null out the connection on close() even if we didn't explicitly close it
+    // to maintain typical semantics.
+    if (isManagedConnection) {
+      if (this.connection != null) {
+        this.connection.close();
+      }
     }
+    this.connection = null;
   }
 
   public QuotaSettings next() throws IOException {
@@ -182,4 +199,4 @@ public class QuotaRetriever implements Closeable, 
Iterable<QuotaSettings> {
     scanner.init(conf, scan);
     return scanner;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a19a95df/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index a1cbe53..861318d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -132,6 +132,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
+import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifier;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierForTest;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -370,6 +373,8 @@ public class HMaster extends HRegionServer implements 
MasterServices {
 
   // it is assigned after 'initialized' guard set to true, so should be 
volatile
   private volatile MasterQuotaManager quotaManager;
+  private SpaceQuotaViolationNotifier spaceQuotaViolationNotifier;
+  private QuotaObserverChore quotaObserverChore;
 
   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
   private WALProcedureStore procedureStore;
@@ -894,6 +899,10 @@ public class HMaster extends HRegionServer implements 
MasterServices {
 
     status.setStatus("Starting quota manager");
     initQuotaManager();
+    this.spaceQuotaViolationNotifier = new 
SpaceQuotaViolationNotifierForTest();
+    this.quotaObserverChore = new QuotaObserverChore(this);
+    // Start the chore to read the region FS space reports and act on them
+    getChoreService().scheduleChore(quotaObserverChore);
 
     // clear the dead servers with same host name and port of online server 
because we are not
     // removing dead server with same hostname and port of rs which is trying 
to check in before
@@ -1192,6 +1201,9 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     if (this.periodicDoMetricsChore != null) {
       periodicDoMetricsChore.cancel();
     }
+    if (this.quotaObserverChore != null) {
+      quotaObserverChore.cancel();
+    }
   }
 
   /**
@@ -3300,4 +3312,12 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   public LockManager getLockManager() {
     return lockManager;
   }
+
+  public QuotaObserverChore getQuotaObserverChore() {
+    return this.quotaObserverChore;
+  }
+
+  public SpaceQuotaViolationNotifier getSpaceQuotaViolationNotifier() {
+    return this.spaceQuotaViolationNotifier;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a19a95df/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
index 37ccdc0..206d81d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
@@ -523,6 +523,7 @@ public class MasterQuotaManager implements 
RegionStateListener {
 
   public void addRegionSize(HRegionInfo hri, long size) {
     // TODO Make proper API
+    // TODO Prevent from growing indefinitely
     regionSizes.put(hri, size);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a19a95df/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaViolationStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaViolationStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaViolationStore.java
new file mode 100644
index 0000000..017ecec
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaViolationStore.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+/**
+ * {@link QuotaViolationStore} implementation for namespaces.
+ */
+@InterfaceAudience.Private
+public class NamespaceQuotaViolationStore implements 
QuotaViolationStore<String> {
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final ReadLock rlock = lock.readLock();
+  private final WriteLock wlock = lock.writeLock();
+
+  private final Connection conn;
+  private final QuotaObserverChore chore;
+  private Map<HRegionInfo,Long> regionUsage;
+
+  public NamespaceQuotaViolationStore(Connection conn, QuotaObserverChore 
chore, Map<HRegionInfo,Long> regionUsage) {
+    this.conn = Objects.requireNonNull(conn);
+    this.chore = Objects.requireNonNull(chore);
+    this.regionUsage = Objects.requireNonNull(regionUsage);
+  }
+
+  @Override
+  public SpaceQuota getSpaceQuota(String namespace) throws IOException {
+    Quotas quotas = getQuotaForNamespace(namespace);
+    if (null != quotas && quotas.hasSpace()) {
+      return quotas.getSpace();
+    }
+    return null;
+  }
+
+  /**
+   * Fetches the namespace quota. Visible for mocking/testing.
+   */
+  Quotas getQuotaForNamespace(String namespace) throws IOException {
+    return QuotaTableUtil.getNamespaceQuota(conn, namespace);
+  }
+
+  @Override
+  public ViolationState getCurrentState(String namespace) {
+    // Defer the "current state" to the chore
+    return this.chore.getNamespaceQuotaViolation(namespace);
+  }
+
+  @Override
+  public ViolationState getTargetState(String subject, SpaceQuota spaceQuota) {
+    rlock.lock();
+    try {
+      final long sizeLimitInBytes = spaceQuota.getSoftLimit();
+      long sum = 0L;
+      for (Entry<HRegionInfo,Long> entry : filterBySubject(subject)) {
+        sum += entry.getValue();
+        if (sum > sizeLimitInBytes) {
+          // Short-circuit early
+          return ViolationState.IN_VIOLATION;
+        }
+      }
+      // Observance is defined as the size of the table being less than the 
limit
+      return sum <= sizeLimitInBytes ? ViolationState.IN_OBSERVANCE : 
ViolationState.IN_VIOLATION;
+    } finally {
+      rlock.unlock();
+    }
+  }
+
+  @Override
+  public Iterable<Entry<HRegionInfo,Long>> filterBySubject(String namespace) {
+    rlock.lock();
+    try {
+      return Iterables.filter(regionUsage.entrySet(), new 
Predicate<Entry<HRegionInfo,Long>>() {
+        @Override
+        public boolean apply(Entry<HRegionInfo,Long> input) {
+          return 
namespace.equals(input.getKey().getTable().getNamespaceAsString());
+        }
+      });
+    } finally {
+      rlock.unlock();
+    }
+  }
+
+  @Override
+  public void setCurrentState(String namespace, ViolationState state) {
+    // Defer the "current state" to the chore
+    this.chore.setNamespaceQuotaViolation(namespace, state);
+  }
+
+  @Override
+  public void setRegionUsage(Map<HRegionInfo,Long> regionUsage) {
+    wlock.lock();
+    try {
+      this.regionUsage = Objects.requireNonNull(regionUsage);
+    } finally {
+      wlock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a19a95df/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
new file mode 100644
index 0000000..88a6149
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+
+/**
+ * Reads the currently received Region filesystem-space use reports and acts 
on those which
+ * violate a defined quota.
+ */
+@InterfaceAudience.Private
+public class QuotaObserverChore extends ScheduledChore {
+  private static final Log LOG = LogFactory.getLog(QuotaObserverChore.class);
+  static final String VIOLATION_OBSERVER_CHORE_PERIOD_KEY =
+      "hbase.master.quotas.violation.observer.chore.period";
+  static final int VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 
5 minutes in millis
+
+  static final String VIOLATION_OBSERVER_CHORE_DELAY_KEY =
+      "hbase.master.quotas.violation.observer.chore.delay";
+  static final long VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 
minute
+
+  static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY =
+      "hbase.master.quotas.violation.observer.chore.timeunit";
+  static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT = 
TimeUnit.MILLISECONDS.name();
+
+  static final String VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY =
+      "hbase.master.quotas.violation.observer.report.percent";
+  static final double VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
+
+  private final Connection conn;
+  private final Configuration conf;
+  private final MasterQuotaManager quotaManager;
+  /*
+   * Callback that changes in quota violation are passed to.
+   */
+  private final SpaceQuotaViolationNotifier violationNotifier;
+
+  /*
+   * Preserves the state of quota violations for tables and namespaces
+   */
+  private final Map<TableName,ViolationState> tableQuotaViolationStates;
+  private final Map<String,ViolationState> namespaceQuotaViolationStates;
+
+  /*
+   * Encapsulates logic for moving tables/namespaces into or out of quota 
violation
+   */
+  private QuotaViolationStore<TableName> tableViolationStore;
+  private QuotaViolationStore<String> namespaceViolationStore;
+
+  public QuotaObserverChore(HMaster master) {
+    this(
+        master.getConnection(), master.getConfiguration(),
+        master.getSpaceQuotaViolationNotifier(), 
master.getMasterQuotaManager(),
+        master);
+  }
+
+  QuotaObserverChore(
+      Connection conn, Configuration conf, SpaceQuotaViolationNotifier 
violationNotifier,
+      MasterQuotaManager quotaManager, Stoppable stopper) {
+    super(
+        QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf),
+        getInitialDelay(conf), getTimeUnit(conf));
+    this.conn = conn;
+    this.conf = conf;
+    this.quotaManager = quotaManager;
+    this.violationNotifier = violationNotifier;
+    this.tableQuotaViolationStates = new HashMap<>();
+    this.namespaceQuotaViolationStates = new HashMap<>();
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      _chore();
+    } catch (IOException e) {
+      LOG.warn("Failed to process quota reports and update quota violation 
state. Will retry.", e);
+    }
+  }
+
+  void _chore() throws IOException {
+    // Get the total set of tables that have quotas defined. Includes table 
quotas
+    // and tables included by namespace quotas.
+    TablesWithQuotas tablesWithQuotas = fetchAllTablesWithQuotasDefined();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Found following tables with quotas: " + tablesWithQuotas);
+    }
+
+    // The current "view" of region space use. Used henceforth.
+    final Map<HRegionInfo,Long> reportedRegionSpaceUse = 
quotaManager.snapshotRegionSizes();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use 
reports");
+    }
+
+    // Create the stores to track table and namespace violations
+    initializeViolationStores(reportedRegionSpaceUse);
+
+    // Filter out tables for which we don't have adequate regionspace reports 
yet.
+    // Important that we do this after we instantiate the stores above
+    tablesWithQuotas.filterInsufficientlyReportedTables(tableViolationStore);
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Filtered insufficiently reported tables, left with " +
+          reportedRegionSpaceUse.size() + " regions reported");
+    }
+
+    // Transition each table to/from quota violation based on the current and 
target state.
+    // Only table quotas are enacted.
+    final Set<TableName> tablesWithTableQuotas = 
tablesWithQuotas.getTableQuotaTables();
+    processTablesWithQuotas(tablesWithTableQuotas);
+
+    // For each Namespace quota, transition each table in the namespace in or 
out of violation
+    // only if a table quota violation policy has not already been applied.
+    final Set<String> namespacesWithQuotas = 
tablesWithQuotas.getNamespacesWithQuotas();
+    final Multimap<String,TableName> tablesByNamespace = 
tablesWithQuotas.getTablesByNamespace();
+    processNamespacesWithQuotas(namespacesWithQuotas, tablesByNamespace);
+  }
+
+  void initializeViolationStores(Map<HRegionInfo,Long> regionSizes) {
+    Map<HRegionInfo,Long> immutableRegionSpaceUse = 
Collections.unmodifiableMap(regionSizes);
+    if (null == tableViolationStore) {
+      tableViolationStore = new TableQuotaViolationStore(conn, this, 
immutableRegionSpaceUse);
+    } else {
+      tableViolationStore.setRegionUsage(immutableRegionSpaceUse);
+    }
+    if (null == namespaceViolationStore) {
+      namespaceViolationStore = new NamespaceQuotaViolationStore(
+          conn, this, immutableRegionSpaceUse);
+    } else {
+      namespaceViolationStore.setRegionUsage(immutableRegionSpaceUse);
+    }
+  }
+
+  /**
+   * Processes each {@code TableName} which has a quota defined and moves it 
in or out of
+   * violation based on the space use.
+   *
+   * @param tablesWithTableQuotas The HBase tables which have quotas defined
+   */
+  void processTablesWithQuotas(final Set<TableName> tablesWithTableQuotas) 
throws IOException {
+    for (TableName table : tablesWithTableQuotas) {
+      final SpaceQuota spaceQuota = tableViolationStore.getSpaceQuota(table);
+      if (null == spaceQuota) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Unexpectedly did not find a space quota for " + table
+              + ", maybe it was recently deleted.");
+        }
+        continue;
+      }
+      final ViolationState currentState = 
tableViolationStore.getCurrentState(table);
+      final ViolationState targetState = 
tableViolationStore.getTargetState(table, spaceQuota);
+
+      if (currentState == ViolationState.IN_VIOLATION) {
+        if (targetState == ViolationState.IN_OBSERVANCE) {
+          LOG.info(table + " moving into observance of table space quota.");
+          transitionTableToObservance(table);
+          tableViolationStore.setCurrentState(table, 
ViolationState.IN_OBSERVANCE);
+        } else if (targetState == ViolationState.IN_VIOLATION) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(table + " remains in violation of quota.");
+          }
+          tableViolationStore.setCurrentState(table, 
ViolationState.IN_VIOLATION);
+        }
+      } else if (currentState == ViolationState.IN_OBSERVANCE) {
+        if (targetState == ViolationState.IN_VIOLATION) {
+          LOG.info(table + " moving into violation of table space quota.");
+          transitionTableToViolation(table, getViolationPolicy(spaceQuota));
+          tableViolationStore.setCurrentState(table, 
ViolationState.IN_VIOLATION);
+        } else if (targetState == ViolationState.IN_OBSERVANCE) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(table + " remains in observance of quota.");
+          }
+          tableViolationStore.setCurrentState(table, 
ViolationState.IN_OBSERVANCE);
+        }
+      }
+    }
+  }
+
+  /**
+   * Processes each namespace which has a quota defined and moves all of the 
tables contained
+   * in that namespace into or out of violation of the quota. Tables which are 
already in
+   * violation of a quota at the table level which <em>also</em> have a reside 
in a namespace
+   * with a violated quota will not have the namespace quota enacted. The 
table quota takes
+   * priority over the namespace quota.
+   *
+   * @param namespacesWithQuotas The set of namespaces that have quotas defined
+   * @param tablesByNamespace A mapping of namespaces and the tables contained 
in those namespaces
+   */
+  void processNamespacesWithQuotas(
+      final Set<String> namespacesWithQuotas,
+      final Multimap<String,TableName> tablesByNamespace) throws IOException {
+    for (String namespace : namespacesWithQuotas) {
+      // Get the quota definition for the namespace
+      final SpaceQuota spaceQuota = 
namespaceViolationStore.getSpaceQuota(namespace);
+      if (null == spaceQuota) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Could not get Namespace space quota for " + namespace
+              + ", maybe it was recently deleted.");
+        }
+        continue;
+      }
+      final ViolationState currentState = 
namespaceViolationStore.getCurrentState(namespace);
+      final ViolationState targetState = 
namespaceViolationStore.getTargetState(namespace, spaceQuota);
+      // When in observance, check if we need to move to violation.
+      if (ViolationState.IN_OBSERVANCE == currentState) {
+        if (ViolationState.IN_VIOLATION == targetState) {
+          for (TableName tableInNS : tablesByNamespace.get(namespace)) {
+            if (ViolationState.IN_VIOLATION == 
tableViolationStore.getCurrentState(tableInNS)) {
+              // Table-level quota violation policy is being applied here.
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Not activating Namespace violation policy because 
Table violation"
+                    + " policy is already in effect for " + tableInNS);
+              }
+              continue;
+            } else {
+              LOG.info(tableInNS + " moving into violation of namespace space 
quota");
+              transitionTableToViolation(tableInNS, 
getViolationPolicy(spaceQuota));
+            }
+          }
+        } else {
+          // still in observance
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(namespace + " remains in observance of quota.");
+          }
+        }
+      } else if (ViolationState.IN_VIOLATION == currentState) {
+        // When in violation, check if we need to move to observance.
+        if (ViolationState.IN_OBSERVANCE == targetState) {
+          for (TableName tableInNS : tablesByNamespace.get(namespace)) {
+            if (ViolationState.IN_VIOLATION == 
tableViolationStore.getCurrentState(tableInNS)) {
+              // Table-level quota violation policy is being applied here.
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Not activating Namespace violation policy because 
Table violation"
+                    + " policy is already in effect for " + tableInNS);
+              }
+              continue;
+            } else {
+              LOG.info(tableInNS + " moving into observance of namespace space 
quota");
+              transitionTableToObservance(tableInNS);
+            }
+          }
+        } else {
+          // Remains in violation
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(namespace + " remains in violation of quota.");
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Computes the set of all tables that have quotas defined. This includes 
tables with quotas
+   * explicitly set on them, in addition to tables that exist namespaces which 
have a quota
+   * defined.
+   */
+  TablesWithQuotas fetchAllTablesWithQuotasDefined() throws IOException {
+    final Scan scan = QuotaTableUtil.makeScan(null);
+    final QuotaRetriever scanner = new QuotaRetriever();
+    final TablesWithQuotas tablesWithQuotas = new TablesWithQuotas(conn, conf);
+    try {
+      scanner.init(conn, scan);
+      for (QuotaSettings quotaSettings : scanner) {
+        // Only one of namespace and tablename should be 'null'
+        final String namespace = quotaSettings.getNamespace();
+        final TableName tableName = quotaSettings.getTableName();
+        if (QuotaType.SPACE != quotaSettings.getQuotaType()) {
+          continue;
+        }
+
+        if (null != namespace) {
+          assert null == tableName;
+          // Collect all of the tables in the namespace
+          TableName[] tablesInNS = conn.getAdmin()
+              .listTableNamesByNamespace(namespace);
+          for (TableName tableUnderNs : tablesInNS) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Adding " + tableUnderNs + " under " +  namespace
+                  + " as having a namespace quota");
+            }
+            tablesWithQuotas.addNamespaceQuotaTable(tableUnderNs);
+          }
+        } else {
+          assert null != tableName;
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Adding " + tableName + " as having table quota.");
+          }
+          // namespace is already null, must be a non-null tableName
+          tablesWithQuotas.addTableQuotaTable(tableName);
+        }
+      }
+      return tablesWithQuotas;
+    } finally {
+      if (null != scanner) {
+        scanner.close();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  QuotaViolationStore<TableName> getTableViolationStore() {
+    return tableViolationStore;
+  }
+
+  @VisibleForTesting
+  QuotaViolationStore<String> getNamespaceViolationStore() {
+    return namespaceViolationStore;
+  }
+
+  /**
+   * Transitions the given table to violation of its quota, enabling the 
violation policy.
+   */
+  private void transitionTableToViolation(TableName table, 
SpaceViolationPolicy violationPolicy) {
+    this.violationNotifier.transitionTableToViolation(table, violationPolicy);
+  }
+
+  /**
+   * Transitions the given table to observance of its quota, disabling the 
violation policy.
+   */
+  private void transitionTableToObservance(TableName table) {
+    this.violationNotifier.transitionTableToObservance(table);
+  }
+
+  /**
+   * Fetch the {@link ViolationState} for the given table.
+   */
+  ViolationState getTableQuotaViolation(TableName table) {
+    // TODO Can one instance of a Chore be executed concurrently?
+    ViolationState state = this.tableQuotaViolationStates.get(table);
+    if (null == state) {
+      // No tracked state implies observance.
+      return ViolationState.IN_OBSERVANCE;
+    }
+    return state;
+  }
+
+  /**
+   * Stores the quota violation state for the given table.
+   */
+  void setTableQuotaViolation(TableName table, ViolationState state) {
+    this.tableQuotaViolationStates.put(table, state);
+  }
+
+  /**
+   * Fetches the {@link ViolationState} for the given namespace.
+   */
+  ViolationState getNamespaceQuotaViolation(String namespace) {
+    // TODO Can one instance of a Chore be executed concurrently?
+    ViolationState state = this.namespaceQuotaViolationStates.get(namespace);
+    if (null == state) {
+      // No tracked state implies observance.
+      return ViolationState.IN_OBSERVANCE;
+    }
+    return state;
+  }
+
+  /**
+   * Stores the quota violation state for the given namespace.
+   */
+  void setNamespaceQuotaViolation(String namespace, ViolationState state) {
+    this.namespaceQuotaViolationStates.put(namespace, state);
+  }
+
+  /**
+   * Extracts the {@link SpaceViolationPolicy} from the serialized {@link 
Quotas} protobuf.
+   * @throws IllegalArgumentException If the SpaceQuota lacks a ViolationPolicy
+   */
+  SpaceViolationPolicy getViolationPolicy(SpaceQuota spaceQuota) {
+    if (!spaceQuota.hasViolationPolicy()) {
+      throw new IllegalArgumentException("SpaceQuota had no associated 
violation policy: "
+          + spaceQuota);
+    }
+    return ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy());
+  }
+
+  /**
+   * Extracts the period for the chore from the configuration.
+   *
+   * @param conf The configuration object.
+   * @return The configured chore period or the default value.
+   */
+  static int getPeriod(Configuration conf) {
+    return conf.getInt(VIOLATION_OBSERVER_CHORE_PERIOD_KEY,
+        VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT);
+  }
+
+  /**
+   * Extracts the initial delay for the chore from the configuration.
+   *
+   * @param conf The configuration object.
+   * @return The configured chore initial delay or the default value.
+   */
+  static long getInitialDelay(Configuration conf) {
+    return conf.getLong(VIOLATION_OBSERVER_CHORE_DELAY_KEY,
+        VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT);
+  }
+
+  /**
+   * Extracts the time unit for the chore period and initial delay from the 
configuration. The
+   * configuration value for {@link #VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY} 
must correspond to
+   * a {@link TimeUnit} value.
+   *
+   * @param conf The configuration object.
+   * @return The configured time unit for the chore period and initial delay 
or the default value.
+   */
+  static TimeUnit getTimeUnit(Configuration conf) {
+    return TimeUnit.valueOf(conf.get(VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY,
+        VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT));
+  }
+
+  /**
+   * Extracts the percent of Regions for a table to have been reported to 
enable quota violation
+   * state change.
+   *
+   * @param conf The configuration object.
+   * @return The percent of regions reported to use.
+   */
+  static Double getRegionReportPercent(Configuration conf) {
+    return conf.getDouble(VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY,
+        VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT);
+  }
+
+  /**
+   * A container which encapsulates the tables which have a table quota and 
the tables which
+   * are contained in a namespace which have a namespace quota.
+   */
+  static class TablesWithQuotas {
+    private final Set<TableName> tablesWithTableQuotas = new HashSet<>();
+    private final Set<TableName> tablesWithNamespaceQuotas = new HashSet<>();
+    private final Connection conn;
+    private final Configuration conf;
+
+    public TablesWithQuotas(Connection conn, Configuration conf) {
+      this.conn = Objects.requireNonNull(conn);
+      this.conf = Objects.requireNonNull(conf);
+    }
+
+    Configuration getConfiguration() {
+      return conf;
+    }
+
+    /**
+     * Adds a table with a table quota.
+     */
+    public void addTableQuotaTable(TableName tn) {
+      tablesWithTableQuotas.add(tn);
+    }
+
+    /**
+     * Adds a table with a namespace quota.
+     */
+    public void addNamespaceQuotaTable(TableName tn) {
+      tablesWithNamespaceQuotas.add(tn);
+    }
+
+    /**
+     * Returns true if the given table has a table quota.
+     */
+    public boolean hasTableQuota(TableName tn) {
+      return tablesWithTableQuotas.contains(tn);
+    }
+
+    /**
+     * Returns true if the table exists in a namespace with a namespace quota.
+     */
+    public boolean hasNamespaceQuota(TableName tn) {
+      return tablesWithNamespaceQuotas.contains(tn);
+    }
+
+    /**
+     * Returns an unmodifiable view of all tables with table quotas.
+     */
+    public Set<TableName> getTableQuotaTables() {
+      return Collections.unmodifiableSet(tablesWithTableQuotas);
+    }
+
+    /**
+     * Returns an unmodifiable view of all tables in namespaces that have
+     * namespace quotas.
+     */
+    public Set<TableName> getNamespaceQuotaTables() {
+      return Collections.unmodifiableSet(tablesWithNamespaceQuotas);
+    }
+
+    public Set<String> getNamespacesWithQuotas() {
+      Set<String> namespaces = new HashSet<>();
+      for (TableName tn : tablesWithNamespaceQuotas) {
+        namespaces.add(tn.getNamespaceAsString());
+      }
+      return namespaces;
+    }
+
+    /**
+     * Returns a view of all tables that reside in a namespace with a namespace
+     * quota, grouped by the namespace itself.
+     */
+    public Multimap<String,TableName> getTablesByNamespace() {
+      Multimap<String,TableName> tablesByNS = HashMultimap.create();
+      for (TableName tn : tablesWithNamespaceQuotas) {
+        tablesByNS.put(tn.getNamespaceAsString(), tn);
+      }
+      return tablesByNS;
+    }
+
+    /**
+     * Filters out all tables for which the Master currently doesn't have 
enough region space
+     * reports received from RegionServers yet.
+     */
+    public void 
filterInsufficientlyReportedTables(QuotaViolationStore<TableName> tableStore)
+        throws IOException {
+      final double percentRegionsReportedThreshold = 
getRegionReportPercent(getConfiguration());
+      Set<TableName> tablesToRemove = new HashSet<>();
+      for (TableName table : Iterables.concat(tablesWithTableQuotas, 
tablesWithNamespaceQuotas)) {
+        // Don't recompute a table we've already computed
+        if (tablesToRemove.contains(table)) {
+          continue;
+        }
+        final int numRegionsInTable = getNumRegions(table);
+        // If the table doesn't exist (no regions), bail out.
+        if (0 == numRegionsInTable) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Filtering " + table + " because no regions were 
reported");
+          }
+          tablesToRemove.add(table);
+          continue;
+        }
+        final int reportedRegionsInQuota = getNumReportedRegions(table, 
tableStore);
+        final double ratioReported = ((double) reportedRegionsInQuota) / 
numRegionsInTable;
+        if (ratioReported < percentRegionsReportedThreshold) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Filtering " + table + " because " + 
reportedRegionsInQuota  + " of " +
+                numRegionsInTable + " were reported.");
+          }
+          tablesToRemove.add(table);
+        } else if (LOG.isTraceEnabled()) {
+          LOG.trace("Retaining " + table + " because " + 
reportedRegionsInQuota  + " of " +
+              numRegionsInTable + " were reported.");
+        }
+      }
+      for (TableName tableToRemove : tablesToRemove) {
+        tablesWithTableQuotas.remove(tableToRemove);
+        tablesWithNamespaceQuotas.remove(tableToRemove);
+      }
+    }
+
+    /**
+     * Computes the total number of regions in a table.
+     */
+    int getNumRegions(TableName table) throws IOException {
+      List<HRegionInfo> regions = this.conn.getAdmin().getTableRegions(table);
+      if (null == regions) {
+        return 0;
+      }
+      return regions.size();
+    }
+
+    /**
+     * Computes the number of regions reported for a table.
+     */
+    int getNumReportedRegions(TableName table, QuotaViolationStore<TableName> 
tableStore)
+        throws IOException {
+      return Iterables.size(tableStore.filterBySubject(table));
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(32);
+      sb.append(getClass().getSimpleName())
+          .append(": tablesWithTableQuotas=")
+          .append(this.tablesWithTableQuotas)
+          .append(", tablesWithNamespaceQuotas=")
+          .append(this.tablesWithNamespaceQuotas);
+      return sb.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a19a95df/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java
new file mode 100644
index 0000000..381ac8e
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+
+/**
+ * A common interface for computing and storing space quota 
observance/violation for entities.
+ *
+ * An entity is presently a table or a namespace.
+ */
+@InterfaceAudience.Private
+public interface QuotaViolationStore<T> {
+
+  /**
+   * The current state of a table with respect to the policy set forth by a 
quota.
+   */
+  @InterfaceAudience.Private
+  public enum ViolationState {
+    IN_VIOLATION,
+    IN_OBSERVANCE,
+  }
+
+  /**
+   * Fetch the Quota for the given {@code subject}. May be null.
+   *
+   * @param subject The object for which the quota should be fetched
+   */
+  SpaceQuota getSpaceQuota(T subject) throws IOException;
+
+  /**
+   * Returns the current {@link ViolationState} for the given {@code subject}.
+   *
+   * @param subject The object which the quota violation state should be 
fetched
+   */
+  ViolationState getCurrentState(T subject);
+
+  /**
+   * Computes the target {@link ViolationState} for the given {@code subject} 
and
+   * {@code spaceQuota}.
+   *
+   * @param subject The object which to determine the target quota violation 
state of
+   * @param spaceQuota The quota "definition" for the {@code subject}
+   */
+  ViolationState getTargetState(T subject, SpaceQuota spaceQuota);
+
+  /**
+   * Filters the provided <code>regions</code>, returning those which match 
the given
+   * <code>subject</code>.
+   *
+   * @param subject The filter criteria. Only regions belonging to this 
parameter will be returned
+   */
+  Iterable<Entry<HRegionInfo,Long>> filterBySubject(T subject);
+
+  /**
+   * Persists the current {@link ViolationState} for the {@code subject}.
+   *
+   * @param subject The object which the {@link ViolationState} is being 
persisted for
+   * @param state The current {@link ViolationState} of the {@code subject}
+   */
+  void setCurrentState(T subject, ViolationState state);
+
+  /**
+   * Updates {@code this} with the latest snapshot of filesystem use by region.
+   *
+   * @param regionUsage A map of {@code HRegionInfo} objects to their 
filesystem usage in bytes
+   */
+  void setRegionUsage(Map<HRegionInfo,Long> regionUsage);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a19a95df/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
new file mode 100644
index 0000000..bccf519
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * An interface which abstract away the action taken to enable or disable
+ * a space quota violation policy across the HBase cluster.
+ */
+@InterfaceAudience.Private
+public interface SpaceQuotaViolationNotifier {
+
+  /**
+   * Instructs the cluster that the given table is in violation of a space 
quota. The
+   * provided violation policy is the action which should be taken on the 
table.
+   *
+   * @param tableName The name of the table in violation of the quota.
+   * @param violationPolicy The policy which should be enacted on the table.
+   */
+  void transitionTableToViolation(TableName tableName, SpaceViolationPolicy 
violationPolicy);
+
+  /**
+   * Instructs the cluster that the given table is in observance of any 
applicable space quota.
+   *
+   * @param tableName The name of the table in observance.
+   */
+  void transitionTableToObservance(TableName tableName);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a19a95df/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
new file mode 100644
index 0000000..4ab9834
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A SpaceQuotaViolationNotifier implementation for verifying testing.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaViolationNotifierForTest implements 
SpaceQuotaViolationNotifier {
+
+  private final Map<TableName,SpaceViolationPolicy> tablesInViolation = new 
HashMap<>();
+
+  @Override
+  public void transitionTableToViolation(TableName tableName, 
SpaceViolationPolicy violationPolicy) {
+    tablesInViolation.put(tableName, violationPolicy);
+  }
+
+  @Override
+  public void transitionTableToObservance(TableName tableName) {
+    tablesInViolation.remove(tableName);
+  }
+
+  public Map<TableName,SpaceViolationPolicy> snapshotTablesInViolation() {
+    return new HashMap<>(this.tablesInViolation);
+  }
+
+  public void clearTableViolations() {
+    this.tablesInViolation.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a19a95df/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java
new file mode 100644
index 0000000..6aba1cf
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+/**
+ * {@link QuotaViolationStore} for tables.
+ */
+@InterfaceAudience.Private
+public class TableQuotaViolationStore implements 
QuotaViolationStore<TableName> {
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final ReadLock rlock = lock.readLock();
+  private final WriteLock wlock = lock.writeLock();
+
+  private final Connection conn;
+  private final QuotaObserverChore chore;
+  private Map<HRegionInfo,Long> regionUsage;
+
+  public TableQuotaViolationStore(Connection conn, QuotaObserverChore chore, 
Map<HRegionInfo,Long> regionUsage) {
+    this.conn = Objects.requireNonNull(conn);
+    this.chore = Objects.requireNonNull(chore);
+    this.regionUsage = Objects.requireNonNull(regionUsage);
+  }
+
+  @Override
+  public SpaceQuota getSpaceQuota(TableName subject) throws IOException {
+    Quotas quotas = getQuotaForTable(subject);
+    if (null != quotas && quotas.hasSpace()) {
+      return quotas.getSpace();
+    }
+    return null;
+  }
+  /**
+   * Fetches the table quota. Visible for mocking/testing.
+   */
+  Quotas getQuotaForTable(TableName table) throws IOException {
+    return QuotaTableUtil.getTableQuota(conn, table);
+  }
+
+  @Override
+  public ViolationState getCurrentState(TableName table) {
+    // Defer the "current state" to the chore
+    return chore.getTableQuotaViolation(table);
+  }
+
+  @Override
+  public ViolationState getTargetState(TableName table, SpaceQuota spaceQuota) 
{
+    rlock.lock();
+    try {
+      final long sizeLimitInBytes = spaceQuota.getSoftLimit();
+      long sum = 0L;
+      for (Entry<HRegionInfo,Long> entry : filterBySubject(table)) {
+        sum += entry.getValue();
+        if (sum > sizeLimitInBytes) {
+          // Short-circuit early
+          return ViolationState.IN_VIOLATION;
+        }
+      }
+      // Observance is defined as the size of the table being less than the 
limit
+      return sum <= sizeLimitInBytes ? ViolationState.IN_OBSERVANCE : 
ViolationState.IN_VIOLATION;
+    } finally {
+      rlock.unlock();
+    }
+  }
+
+  @Override
+  public Iterable<Entry<HRegionInfo,Long>> filterBySubject(TableName table) {
+    rlock.lock();
+    try {
+      return Iterables.filter(regionUsage.entrySet(), new 
Predicate<Entry<HRegionInfo,Long>>() {
+        @Override
+        public boolean apply(Entry<HRegionInfo,Long> input) {
+          return table.equals(input.getKey().getTable());
+        }
+      });
+    } finally {
+      rlock.unlock();
+    }
+  }
+
+  @Override
+  public void setCurrentState(TableName table, ViolationState state) {
+    // Defer the "current state" to the chore
+    this.chore.setTableQuotaViolation(table, state);
+  }
+
+  @Override
+  public void setRegionUsage(Map<HRegionInfo,Long> regionUsage) {
+    wlock.lock();
+    try {
+      this.regionUsage = Objects.requireNonNull(regionUsage);
+    } finally {
+      wlock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a19a95df/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
new file mode 100644
index 0000000..8182513
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static com.google.common.collect.Iterables.size;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test class for {@link NamespaceQuotaViolationStore}.
+ */
+@Category(SmallTests.class)
+public class TestNamespaceQuotaViolationStore {
+  private static final long ONE_MEGABYTE = 1024L * 1024L;
+
+  private Connection conn;
+  private QuotaObserverChore chore;
+  private Map<HRegionInfo, Long> regionReports;
+  private NamespaceQuotaViolationStore store;
+
+  @Before
+  public void setup() {
+    conn = mock(Connection.class);
+    chore = mock(QuotaObserverChore.class);
+    regionReports = new HashMap<>();
+    store = new NamespaceQuotaViolationStore(conn, chore, regionReports);
+  }
+
+  @Test
+  public void testGetSpaceQuota() throws Exception {
+    NamespaceQuotaViolationStore mockStore = 
mock(NamespaceQuotaViolationStore.class);
+    when(mockStore.getSpaceQuota(any(String.class))).thenCallRealMethod();
+
+    Quotas quotaWithSpace = Quotas.newBuilder().setSpace(
+        SpaceQuota.newBuilder()
+            .setSoftLimit(1024L)
+            .setViolationPolicy(QuotaProtos.SpaceViolationPolicy.DISABLE)
+            .build())
+        .build();
+    Quotas quotaWithoutSpace = Quotas.newBuilder().build();
+
+    AtomicReference<Quotas> quotaRef = new AtomicReference<>();
+    when(mockStore.getQuotaForNamespace(any(String.class))).then(new 
Answer<Quotas>() {
+      @Override
+      public Quotas answer(InvocationOnMock invocation) throws Throwable {
+        return quotaRef.get();
+      }
+    });
+
+    quotaRef.set(quotaWithSpace);
+    assertEquals(quotaWithSpace.getSpace(), mockStore.getSpaceQuota("ns"));
+    quotaRef.set(quotaWithoutSpace);
+    assertNull(mockStore.getSpaceQuota("ns"));
+  }
+
+  @Test
+  public void testTargetViolationState() {
+    final String NS = "ns";
+    TableName tn1 = TableName.valueOf(NS, "tn1");
+    TableName tn2 = TableName.valueOf(NS, "tn2");
+    TableName tn3 = TableName.valueOf("tn3");
+    SpaceQuota quota = SpaceQuota.newBuilder()
+        .setSoftLimit(ONE_MEGABYTE)
+        
.setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(SpaceViolationPolicy.DISABLE))
+        .build();
+
+    // Create some junk data to filter. Makes sure it's so large that it would
+    // immediately violate the quota.
+    for (int i = 0; i < 3; i++) {
+      regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i 
+ 1)),
+          5L * ONE_MEGABYTE);
+    }
+
+    regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(0), 
Bytes.toBytes(1)), 1024L * 512L);
+    regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(1), 
Bytes.toBytes(2)), 1024L * 256L);
+
+    // Below the quota
+    assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, 
quota));
+
+    regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(2), 
Bytes.toBytes(3)), 1024L * 256L);
+
+    // Equal to the quota is still in observance
+    assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, 
quota));
+
+    regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(3), 
Bytes.toBytes(4)), 1024L);
+
+    // Exceeds the quota, should be in violation
+    assertEquals(ViolationState.IN_VIOLATION, store.getTargetState(NS, quota));
+  }
+
+  @Test
+  public void testFilterRegionsByNamespace() {
+    TableName tn1 = TableName.valueOf("foo");
+    TableName tn2 = TableName.valueOf("sn", "bar");
+    TableName tn3 = TableName.valueOf("ns", "foo");
+    TableName tn4 = TableName.valueOf("ns", "bar");
+
+    assertEquals(0, size(store.filterBySubject("asdf")));
+
+    for (int i = 0; i < 5; i++) {
+      regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(i), 
Bytes.toBytes(i+1)), 0L);
+    }
+    for (int i = 0; i < 3; i++) {
+      regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(i), 
Bytes.toBytes(i+1)), 0L);
+    }
+    for (int i = 0; i < 10; i++) {
+      regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), 
Bytes.toBytes(i+1)), 0L);
+    }
+    for (int i = 0; i < 8; i++) {
+      regionReports.put(new HRegionInfo(tn4, Bytes.toBytes(i), 
Bytes.toBytes(i+1)), 0L);
+    }
+    assertEquals(26, regionReports.size());
+    assertEquals(5, 
size(store.filterBySubject(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR)));
+    assertEquals(3, size(store.filterBySubject("sn")));
+    assertEquals(18, size(store.filterBySubject("ns")));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a19a95df/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
new file mode 100644
index 0000000..db549e4
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Non-HBase cluster unit tests for {@link QuotaObserverChore}.
+ */
+@Category(SmallTests.class)
+public class TestQuotaObserverChore {
+  private Connection conn;
+  private QuotaObserverChore chore;
+
+  @Before
+  public void setup() throws Exception {
+    conn = mock(Connection.class);
+    chore = mock(QuotaObserverChore.class);
+    // Set up some rules to call the real method on the mock.
+    when(chore.getViolationPolicy(any(SpaceQuota.class))).thenCallRealMethod();
+  }
+
+  @Test
+  public void testNumRegionsForTable() {
+    TableName tn1 = TableName.valueOf("t1");
+    TableName tn2 = TableName.valueOf("t2");
+    TableName tn3 = TableName.valueOf("t3");
+
+    final int numTable1Regions = 10;
+    final int numTable2Regions = 15;
+    final int numTable3Regions = 8;
+    Map<HRegionInfo,Long> regionReports = new HashMap<>();
+    for (int i = 0; i < numTable1Regions; i++) {
+      regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(i), Bytes.toBytes(i 
+ 1)), 0L);
+    }
+
+    for (int i = 0; i < numTable2Regions; i++) {
+      regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(i), Bytes.toBytes(i 
+ 1)), 0L);
+    }
+
+    for (int i = 0; i < numTable3Regions; i++) {
+      regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i 
+ 1)), 0L);
+    }
+
+    TableQuotaViolationStore store = new TableQuotaViolationStore(conn, chore, 
regionReports);
+    when(chore.getTableViolationStore()).thenReturn(store);
+
+    assertEquals(numTable1Regions, Iterables.size(store.filterBySubject(tn1)));
+    assertEquals(numTable2Regions, Iterables.size(store.filterBySubject(tn2)));
+    assertEquals(numTable3Regions, Iterables.size(store.filterBySubject(tn3)));
+  }
+
+  @Test
+  public void testExtractViolationPolicy() {
+    for (SpaceViolationPolicy policy : SpaceViolationPolicy.values()) {
+      SpaceQuota spaceQuota = SpaceQuota.newBuilder()
+          .setSoftLimit(1024L)
+          .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
+          .build();
+      assertEquals(policy, chore.getViolationPolicy(spaceQuota));
+    }
+    SpaceQuota malformedQuota = SpaceQuota.newBuilder()
+        .setSoftLimit(1024L)
+        .build();
+    try {
+      chore.getViolationPolicy(malformedQuota);
+      fail("Should have thrown an IllegalArgumentException.");
+    } catch (IllegalArgumentException e) {
+      // Pass
+    }
+  }
+}
\ No newline at end of file

Reply via email to