http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java index 8b127d9..973ac8c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java @@ -37,9 +37,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; import com.google.common.annotations.VisibleForTesting; @@ -54,51 +53,51 @@ import com.google.common.collect.Multimap; @InterfaceAudience.Private public class QuotaObserverChore extends ScheduledChore { private static final Log LOG = LogFactory.getLog(QuotaObserverChore.class); - static final String VIOLATION_OBSERVER_CHORE_PERIOD_KEY = - "hbase.master.quotas.violation.observer.chore.period"; - static final int VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis + static final String QUOTA_OBSERVER_CHORE_PERIOD_KEY = + "hbase.master.quotas.observer.chore.period"; + static final int QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis - static final String VIOLATION_OBSERVER_CHORE_DELAY_KEY = - "hbase.master.quotas.violation.observer.chore.delay"; - static final long VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute + static final String QUOTA_OBSERVER_CHORE_DELAY_KEY = + "hbase.master.quotas.observer.chore.delay"; + static final long QUOTA_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute - static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY = - "hbase.master.quotas.violation.observer.chore.timeunit"; - static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); + static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY = + "hbase.master.quotas.observer.chore.timeunit"; + static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); - static final String VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY = - "hbase.master.quotas.violation.observer.report.percent"; - static final double VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95; + static final String QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY = + "hbase.master.quotas.observer.report.percent"; + static final double QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95; private final Connection conn; private final Configuration conf; private final MasterQuotaManager quotaManager; /* - * Callback that changes in quota violation are passed to. + * Callback that changes in quota snapshots are passed to. */ - private final SpaceQuotaViolationNotifier violationNotifier; + private final SpaceQuotaSnapshotNotifier snapshotNotifier; /* - * Preserves the state of quota violations for tables and namespaces + * Preserves the state of quota snapshots for tables and namespaces */ - private final Map<TableName,ViolationState> tableQuotaViolationStates; - private final Map<String,ViolationState> namespaceQuotaViolationStates; + private final Map<TableName,SpaceQuotaSnapshot> tableQuotaSnapshots; + private final Map<String,SpaceQuotaSnapshot> namespaceQuotaSnapshots; /* - * Encapsulates logic for moving tables/namespaces into or out of quota violation + * Encapsulates logic for tracking the state of a table/namespace WRT space quotas */ - private QuotaViolationStore<TableName> tableViolationStore; - private QuotaViolationStore<String> namespaceViolationStore; + private QuotaSnapshotStore<TableName> tableSnapshotStore; + private QuotaSnapshotStore<String> namespaceSnapshotStore; public QuotaObserverChore(HMaster master) { this( master.getConnection(), master.getConfiguration(), - master.getSpaceQuotaViolationNotifier(), master.getMasterQuotaManager(), + master.getSpaceQuotaSnapshotNotifier(), master.getMasterQuotaManager(), master); } QuotaObserverChore( - Connection conn, Configuration conf, SpaceQuotaViolationNotifier violationNotifier, + Connection conn, Configuration conf, SpaceQuotaSnapshotNotifier snapshotNotifier, MasterQuotaManager quotaManager, Stoppable stopper) { super( QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf), @@ -106,17 +105,20 @@ public class QuotaObserverChore extends ScheduledChore { this.conn = conn; this.conf = conf; this.quotaManager = quotaManager; - this.violationNotifier = violationNotifier; - this.tableQuotaViolationStates = new HashMap<>(); - this.namespaceQuotaViolationStates = new HashMap<>(); + this.snapshotNotifier = Objects.requireNonNull(snapshotNotifier); + this.tableQuotaSnapshots = new HashMap<>(); + this.namespaceQuotaSnapshots = new HashMap<>(); } @Override protected void chore() { try { + if (LOG.isTraceEnabled()) { + LOG.trace("Refreshing space quotas in RegionServer"); + } _chore(); } catch (IOException e) { - LOG.warn("Failed to process quota reports and update quota violation state. Will retry.", e); + LOG.warn("Failed to process quota reports and update quota state. Will retry.", e); } } @@ -134,12 +136,12 @@ public class QuotaObserverChore extends ScheduledChore { LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports"); } - // Create the stores to track table and namespace violations - initializeViolationStores(reportedRegionSpaceUse); + // Create the stores to track table and namespace snapshots + initializeSnapshotStores(reportedRegionSpaceUse); // Filter out tables for which we don't have adequate regionspace reports yet. // Important that we do this after we instantiate the stores above - tablesWithQuotas.filterInsufficientlyReportedTables(tableViolationStore); + tablesWithQuotas.filterInsufficientlyReportedTables(tableSnapshotStore); if (LOG.isTraceEnabled()) { LOG.trace("Filtered insufficiently reported tables, left with " + @@ -158,18 +160,18 @@ public class QuotaObserverChore extends ScheduledChore { processNamespacesWithQuotas(namespacesWithQuotas, tablesByNamespace); } - void initializeViolationStores(Map<HRegionInfo,Long> regionSizes) { + void initializeSnapshotStores(Map<HRegionInfo,Long> regionSizes) { Map<HRegionInfo,Long> immutableRegionSpaceUse = Collections.unmodifiableMap(regionSizes); - if (null == tableViolationStore) { - tableViolationStore = new TableQuotaViolationStore(conn, this, immutableRegionSpaceUse); + if (null == tableSnapshotStore) { + tableSnapshotStore = new TableQuotaSnapshotStore(conn, this, immutableRegionSpaceUse); } else { - tableViolationStore.setRegionUsage(immutableRegionSpaceUse); + tableSnapshotStore.setRegionUsage(immutableRegionSpaceUse); } - if (null == namespaceViolationStore) { - namespaceViolationStore = new NamespaceQuotaViolationStore( + if (null == namespaceSnapshotStore) { + namespaceSnapshotStore = new NamespaceQuotaSnapshotStore( conn, this, immutableRegionSpaceUse); } else { - namespaceViolationStore.setRegionUsage(immutableRegionSpaceUse); + namespaceSnapshotStore.setRegionUsage(immutableRegionSpaceUse); } } @@ -181,7 +183,7 @@ public class QuotaObserverChore extends ScheduledChore { */ void processTablesWithQuotas(final Set<TableName> tablesWithTableQuotas) throws IOException { for (TableName table : tablesWithTableQuotas) { - final SpaceQuota spaceQuota = tableViolationStore.getSpaceQuota(table); + final SpaceQuota spaceQuota = tableSnapshotStore.getSpaceQuota(table); if (null == spaceQuota) { if (LOG.isDebugEnabled()) { LOG.debug("Unexpectedly did not find a space quota for " + table @@ -189,32 +191,12 @@ public class QuotaObserverChore extends ScheduledChore { } continue; } - final ViolationState currentState = tableViolationStore.getCurrentState(table); - final ViolationState targetState = tableViolationStore.getTargetState(table, spaceQuota); - - if (currentState == ViolationState.IN_VIOLATION) { - if (targetState == ViolationState.IN_OBSERVANCE) { - LOG.info(table + " moving into observance of table space quota."); - transitionTableToObservance(table); - tableViolationStore.setCurrentState(table, ViolationState.IN_OBSERVANCE); - } else if (targetState == ViolationState.IN_VIOLATION) { - if (LOG.isTraceEnabled()) { - LOG.trace(table + " remains in violation of quota."); - } - tableViolationStore.setCurrentState(table, ViolationState.IN_VIOLATION); - } - } else if (currentState == ViolationState.IN_OBSERVANCE) { - if (targetState == ViolationState.IN_VIOLATION) { - LOG.info(table + " moving into violation of table space quota."); - transitionTableToViolation(table, getViolationPolicy(spaceQuota)); - tableViolationStore.setCurrentState(table, ViolationState.IN_VIOLATION); - } else if (targetState == ViolationState.IN_OBSERVANCE) { - if (LOG.isTraceEnabled()) { - LOG.trace(table + " remains in observance of quota."); - } - tableViolationStore.setCurrentState(table, ViolationState.IN_OBSERVANCE); - } + final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(table); + final SpaceQuotaSnapshot targetSnapshot = tableSnapshotStore.getTargetState(table, spaceQuota); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing " + table + " with current=" + currentSnapshot + ", target=" + targetSnapshot); } + updateTableQuota(table, currentSnapshot, targetSnapshot); } } @@ -233,7 +215,7 @@ public class QuotaObserverChore extends ScheduledChore { final Multimap<String,TableName> tablesByNamespace) throws IOException { for (String namespace : namespacesWithQuotas) { // Get the quota definition for the namespace - final SpaceQuota spaceQuota = namespaceViolationStore.getSpaceQuota(namespace); + final SpaceQuota spaceQuota = namespaceSnapshotStore.getSpaceQuota(namespace); if (null == spaceQuota) { if (LOG.isDebugEnabled()) { LOG.debug("Could not get Namespace space quota for " + namespace @@ -241,50 +223,117 @@ public class QuotaObserverChore extends ScheduledChore { } continue; } - final ViolationState currentState = namespaceViolationStore.getCurrentState(namespace); - final ViolationState targetState = namespaceViolationStore.getTargetState(namespace, spaceQuota); - // When in observance, check if we need to move to violation. - if (ViolationState.IN_OBSERVANCE == currentState) { - if (ViolationState.IN_VIOLATION == targetState) { - for (TableName tableInNS : tablesByNamespace.get(namespace)) { - if (ViolationState.IN_VIOLATION == tableViolationStore.getCurrentState(tableInNS)) { - // Table-level quota violation policy is being applied here. - if (LOG.isTraceEnabled()) { - LOG.trace("Not activating Namespace violation policy because Table violation" - + " policy is already in effect for " + tableInNS); - } - continue; - } else { - LOG.info(tableInNS + " moving into violation of namespace space quota"); - transitionTableToViolation(tableInNS, getViolationPolicy(spaceQuota)); + final SpaceQuotaSnapshot currentSnapshot = namespaceSnapshotStore.getCurrentState(namespace); + final SpaceQuotaSnapshot targetSnapshot = namespaceSnapshotStore.getTargetState(namespace, spaceQuota); + updateNamespaceQuota(namespace, currentSnapshot, targetSnapshot, tablesByNamespace); + } + } + + /** + * Updates the hbase:quota table with the new quota policy for this <code>table</code> + * if necessary. + * + * @param table The table being checked + * @param currentSnapshot The state of the quota on this table from the previous invocation. + * @param targetSnapshot The state the quota should be in for this table. + */ + void updateTableQuota( + TableName table, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot) + throws IOException { + final SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus(); + final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus(); + + // If we're changing something, log it. + if (!currentSnapshot.equals(targetSnapshot)) { + // If the target is none, we're moving out of violation. Update the hbase:quota table + if (!targetStatus.isInViolation()) { + if (LOG.isDebugEnabled()) { + LOG.debug(table + " moving into observance of table space quota."); + } + } else if (LOG.isDebugEnabled()) { + // We're either moving into violation or changing violation policies + LOG.debug(table + " moving into violation of table space quota with policy of " + targetStatus.getPolicy()); + } + + this.snapshotNotifier.transitionTable(table, targetSnapshot); + // Update it in memory + tableSnapshotStore.setCurrentState(table, targetSnapshot); + } else if (LOG.isTraceEnabled()) { + // Policies are the same, so we have nothing to do except log this. Don't need to re-update the quota table + if (!currentStatus.isInViolation()) { + LOG.trace(table + " remains in observance of quota."); + } else { + LOG.trace(table + " remains in violation of quota."); + } + } + } + + /** + * Updates the hbase:quota table with the target quota policy for this <code>namespace</code> + * if necessary. + * + * @param namespace The namespace being checked + * @param currentSnapshot The state of the quota on this namespace from the previous invocation + * @param targetSnapshot The state the quota should be in for this namespace + * @param tablesByNamespace A mapping of tables in namespaces. + */ + void updateNamespaceQuota( + String namespace, SpaceQuotaSnapshot currentSnapshot, SpaceQuotaSnapshot targetSnapshot, + final Multimap<String,TableName> tablesByNamespace) throws IOException { + final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus(); + + // When the policies differ, we need to move into or out of violatino + if (!currentSnapshot.equals(targetSnapshot)) { + // We want to have a policy of "NONE", moving out of violation + if (!targetStatus.isInViolation()) { + for (TableName tableInNS : tablesByNamespace.get(namespace)) { + if (!tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) { + // Table-level quota violation policy is being applied here. + if (LOG.isTraceEnabled()) { + LOG.trace("Not activating Namespace violation policy because a Table violation" + + " policy is already in effect for " + tableInNS); } - } - } else { - // still in observance - if (LOG.isTraceEnabled()) { - LOG.trace(namespace + " remains in observance of quota."); + } else { + LOG.info(tableInNS + " moving into observance of namespace space quota"); + this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot); } } - } else if (ViolationState.IN_VIOLATION == currentState) { - // When in violation, check if we need to move to observance. - if (ViolationState.IN_OBSERVANCE == targetState) { - for (TableName tableInNS : tablesByNamespace.get(namespace)) { - if (ViolationState.IN_VIOLATION == tableViolationStore.getCurrentState(tableInNS)) { - // Table-level quota violation policy is being applied here. - if (LOG.isTraceEnabled()) { - LOG.trace("Not activating Namespace violation policy because Table violation" - + " policy is already in effect for " + tableInNS); - } - continue; - } else { - LOG.info(tableInNS + " moving into observance of namespace space quota"); - transitionTableToObservance(tableInNS); + } else { + // Moving tables in the namespace into violation or to a different violation policy + for (TableName tableInNS : tablesByNamespace.get(namespace)) { + if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) { + // Table-level quota violation policy is being applied here. + if (LOG.isTraceEnabled()) { + LOG.trace("Not activating Namespace violation policy because a Table violation" + + " policy is already in effect for " + tableInNS); } + } else { + LOG.info(tableInNS + " moving into violation of namespace space quota with policy " + targetStatus.getPolicy()); + this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot); } - } else { - // Remains in violation - if (LOG.isTraceEnabled()) { - LOG.trace(namespace + " remains in violation of quota."); + } + } + } else { + // Policies are the same + if (!targetStatus.isInViolation()) { + // Both are NONE, so we remain in observance + if (LOG.isTraceEnabled()) { + LOG.trace(namespace + " remains in observance of quota."); + } + } else { + // Namespace quota is still in violation, need to enact if the table quota is not taking priority. + for (TableName tableInNS : tablesByNamespace.get(namespace)) { + // Does a table policy exist + if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) { + // Table-level quota violation policy is being applied here. + if (LOG.isTraceEnabled()) { + LOG.trace("Not activating Namespace violation policy because Table violation" + + " policy is already in effect for " + tableInNS); + } + } else { + // No table policy, so enact namespace policy + LOG.info(tableInNS + " moving into violation of namespace space quota"); + this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot); } } } @@ -340,39 +389,24 @@ public class QuotaObserverChore extends ScheduledChore { } @VisibleForTesting - QuotaViolationStore<TableName> getTableViolationStore() { - return tableViolationStore; + QuotaSnapshotStore<TableName> getTableSnapshotStore() { + return tableSnapshotStore; } @VisibleForTesting - QuotaViolationStore<String> getNamespaceViolationStore() { - return namespaceViolationStore; + QuotaSnapshotStore<String> getNamespaceSnapshotStore() { + return namespaceSnapshotStore; } /** - * Transitions the given table to violation of its quota, enabling the violation policy. + * Fetches the {@link SpaceQuotaSnapshot} for the given table. */ - private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy) - throws IOException { - this.violationNotifier.transitionTableToViolation(table, violationPolicy); - } - - /** - * Transitions the given table to observance of its quota, disabling the violation policy. - */ - private void transitionTableToObservance(TableName table) throws IOException { - this.violationNotifier.transitionTableToObservance(table); - } - - /** - * Fetch the {@link ViolationState} for the given table. - */ - ViolationState getTableQuotaViolation(TableName table) { + SpaceQuotaSnapshot getTableQuotaSnapshot(TableName table) { // TODO Can one instance of a Chore be executed concurrently? - ViolationState state = this.tableQuotaViolationStates.get(table); + SpaceQuotaSnapshot state = this.tableQuotaSnapshots.get(table); if (null == state) { // No tracked state implies observance. - return ViolationState.IN_OBSERVANCE; + return QuotaSnapshotStore.NO_QUOTA; } return state; } @@ -380,19 +414,19 @@ public class QuotaObserverChore extends ScheduledChore { /** * Stores the quota violation state for the given table. */ - void setTableQuotaViolation(TableName table, ViolationState state) { - this.tableQuotaViolationStates.put(table, state); + void setTableQuotaViolation(TableName table, SpaceQuotaSnapshot snapshot) { + this.tableQuotaSnapshots.put(table, snapshot); } /** - * Fetches the {@link ViolationState} for the given namespace. + * Fetches the {@link SpaceQuotaSnapshot} for the given namespace. */ - ViolationState getNamespaceQuotaViolation(String namespace) { + SpaceQuotaSnapshot getNamespaceQuotaSnapshot(String namespace) { // TODO Can one instance of a Chore be executed concurrently? - ViolationState state = this.namespaceQuotaViolationStates.get(namespace); + SpaceQuotaSnapshot state = this.namespaceQuotaSnapshots.get(namespace); if (null == state) { // No tracked state implies observance. - return ViolationState.IN_OBSERVANCE; + return QuotaSnapshotStore.NO_QUOTA; } return state; } @@ -400,20 +434,8 @@ public class QuotaObserverChore extends ScheduledChore { /** * Stores the quota violation state for the given namespace. */ - void setNamespaceQuotaViolation(String namespace, ViolationState state) { - this.namespaceQuotaViolationStates.put(namespace, state); - } - - /** - * Extracts the {@link SpaceViolationPolicy} from the serialized {@link Quotas} protobuf. - * @throws IllegalArgumentException If the SpaceQuota lacks a ViolationPolicy - */ - SpaceViolationPolicy getViolationPolicy(SpaceQuota spaceQuota) { - if (!spaceQuota.hasViolationPolicy()) { - throw new IllegalArgumentException("SpaceQuota had no associated violation policy: " - + spaceQuota); - } - return ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy()); + void setNamespaceQuotaSnapshot(String namespace, SpaceQuotaSnapshot snapshot) { + this.namespaceQuotaSnapshots.put(namespace, snapshot); } /** @@ -423,8 +445,8 @@ public class QuotaObserverChore extends ScheduledChore { * @return The configured chore period or the default value. */ static int getPeriod(Configuration conf) { - return conf.getInt(VIOLATION_OBSERVER_CHORE_PERIOD_KEY, - VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT); + return conf.getInt(QUOTA_OBSERVER_CHORE_PERIOD_KEY, + QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT); } /** @@ -434,21 +456,21 @@ public class QuotaObserverChore extends ScheduledChore { * @return The configured chore initial delay or the default value. */ static long getInitialDelay(Configuration conf) { - return conf.getLong(VIOLATION_OBSERVER_CHORE_DELAY_KEY, - VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT); + return conf.getLong(QUOTA_OBSERVER_CHORE_DELAY_KEY, + QUOTA_OBSERVER_CHORE_DELAY_DEFAULT); } /** * Extracts the time unit for the chore period and initial delay from the configuration. The - * configuration value for {@link #VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to + * configuration value for {@link #QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to * a {@link TimeUnit} value. * * @param conf The configuration object. * @return The configured time unit for the chore period and initial delay or the default value. */ static TimeUnit getTimeUnit(Configuration conf) { - return TimeUnit.valueOf(conf.get(VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY, - VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT)); + return TimeUnit.valueOf(conf.get(QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY, + QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT)); } /** @@ -459,8 +481,8 @@ public class QuotaObserverChore extends ScheduledChore { * @return The percent of regions reported to use. */ static Double getRegionReportPercent(Configuration conf) { - return conf.getDouble(VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY, - VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT); + return conf.getDouble(QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY, + QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT); } /** @@ -549,7 +571,7 @@ public class QuotaObserverChore extends ScheduledChore { * Filters out all tables for which the Master currently doesn't have enough region space * reports received from RegionServers yet. */ - public void filterInsufficientlyReportedTables(QuotaViolationStore<TableName> tableStore) + public void filterInsufficientlyReportedTables(QuotaSnapshotStore<TableName> tableStore) throws IOException { final double percentRegionsReportedThreshold = getRegionReportPercent(getConfiguration()); Set<TableName> tablesToRemove = new HashSet<>(); @@ -572,12 +594,12 @@ public class QuotaObserverChore extends ScheduledChore { if (ratioReported < percentRegionsReportedThreshold) { if (LOG.isTraceEnabled()) { LOG.trace("Filtering " + table + " because " + reportedRegionsInQuota + " of " + - numRegionsInTable + " were reported."); + numRegionsInTable + " regions were reported."); } tablesToRemove.add(table); } else if (LOG.isTraceEnabled()) { LOG.trace("Retaining " + table + " because " + reportedRegionsInQuota + " of " + - numRegionsInTable + " were reported."); + numRegionsInTable + " regions were reported."); } } for (TableName tableToRemove : tablesToRemove) { @@ -600,7 +622,7 @@ public class QuotaObserverChore extends ScheduledChore { /** * Computes the number of regions reported for a table. */ - int getNumReportedRegions(TableName table, QuotaViolationStore<TableName> tableStore) + int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) throws IOException { return Iterables.size(tableStore.filterBySubject(table)); }
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java new file mode 100644 index 0000000..8b0b3a7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSnapshotStore.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; + +/** + * A common interface for computing and storing space quota observance/violation for entities. + * + * An entity is presently a table or a namespace. + */ +@InterfaceAudience.Private +public interface QuotaSnapshotStore<T> { + + /** + * The current state of a table with respect to the policy set forth by a quota. + */ + @InterfaceAudience.Private + public enum ViolationState { + IN_VIOLATION, + IN_OBSERVANCE, + } + + /** + * Singleton to represent a table without a quota defined. It is never in violation. + */ + public static final SpaceQuotaSnapshot NO_QUOTA = new SpaceQuotaSnapshot( + SpaceQuotaStatus.notInViolation(), -1, -1); + + /** + * Fetch the Quota for the given {@code subject}. May be null. + * + * @param subject The object for which the quota should be fetched + */ + SpaceQuota getSpaceQuota(T subject) throws IOException; + + /** + * Returns the current {@link SpaceQuotaSnapshot} for the given {@code subject}. + * + * @param subject The object which the quota snapshot should be fetched + */ + SpaceQuotaSnapshot getCurrentState(T subject); + + /** + * Computes the target {@link SpaceQuotaSnapshot} for the given {@code subject} and + * {@code spaceQuota}. + * + * @param subject The object which to determine the target SpaceQuotaSnapshot of + * @param spaceQuota The quota "definition" for the {@code subject} + */ + SpaceQuotaSnapshot getTargetState(T subject, SpaceQuota spaceQuota); + + /** + * Filters the provided <code>regions</code>, returning those which match the given + * <code>subject</code>. + * + * @param subject The filter criteria. Only regions belonging to this parameter will be returned + */ + Iterable<Entry<HRegionInfo,Long>> filterBySubject(T subject); + + /** + * Persists the current {@link SpaceQuotaSnapshot} for the {@code subject}. + * + * @param subject The object which the {@link SpaceQuotaSnapshot} is being persisted for + * @param state The current state of the {@code subject} + */ + void setCurrentState(T subject, SpaceQuotaSnapshot state); + + /** + * Updates {@code this} with the latest snapshot of filesystem use by region. + * + * @param regionUsage A map of {@code HRegionInfo} objects to their filesystem usage in bytes + */ + void setRegionUsage(Map<HRegionInfo,Long> regionUsage); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java deleted file mode 100644 index 381ac8e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.quotas; - -import java.io.IOException; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; - -/** - * A common interface for computing and storing space quota observance/violation for entities. - * - * An entity is presently a table or a namespace. - */ -@InterfaceAudience.Private -public interface QuotaViolationStore<T> { - - /** - * The current state of a table with respect to the policy set forth by a quota. - */ - @InterfaceAudience.Private - public enum ViolationState { - IN_VIOLATION, - IN_OBSERVANCE, - } - - /** - * Fetch the Quota for the given {@code subject}. May be null. - * - * @param subject The object for which the quota should be fetched - */ - SpaceQuota getSpaceQuota(T subject) throws IOException; - - /** - * Returns the current {@link ViolationState} for the given {@code subject}. - * - * @param subject The object which the quota violation state should be fetched - */ - ViolationState getCurrentState(T subject); - - /** - * Computes the target {@link ViolationState} for the given {@code subject} and - * {@code spaceQuota}. - * - * @param subject The object which to determine the target quota violation state of - * @param spaceQuota The quota "definition" for the {@code subject} - */ - ViolationState getTargetState(T subject, SpaceQuota spaceQuota); - - /** - * Filters the provided <code>regions</code>, returning those which match the given - * <code>subject</code>. - * - * @param subject The filter criteria. Only regions belonging to this parameter will be returned - */ - Iterable<Entry<HRegionInfo,Long>> filterBySubject(T subject); - - /** - * Persists the current {@link ViolationState} for the {@code subject}. - * - * @param subject The object which the {@link ViolationState} is being persisted for - * @param state The current {@link ViolationState} of the {@code subject} - */ - void setCurrentState(T subject, ViolationState state); - - /** - * Updates {@code this} with the latest snapshot of filesystem use by region. - * - * @param regionUsage A map of {@code HRegionInfo} objects to their filesystem usage in bytes - */ - void setRegionUsage(Map<HRegionInfo,Long> regionUsage); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java index 9a8edb9..1c82808 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java @@ -20,24 +20,29 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.util.Bytes; + +import com.google.common.annotations.VisibleForTesting; /** * A manager for filesystem space quotas in the RegionServer. * - * This class is responsible for reading quota violation policies from the quota - * table and then enacting them on the given table. + * This class is the centralized point for what a RegionServer knows about space quotas + * on tables. For each table, it tracks two different things: the {@link SpaceQuotaSnapshot} + * and a {@link SpaceViolationPolicyEnforcement} (which may be null when a quota is not + * being violated). Both of these are sensitive on when they were last updated. The + * {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and updates + * the state on <code>this</code>. */ @InterfaceAudience.Private public class RegionServerSpaceQuotaManager { @@ -45,12 +50,23 @@ public class RegionServerSpaceQuotaManager { private final RegionServerServices rsServices; - private SpaceQuotaViolationPolicyRefresherChore spaceQuotaRefresher; - private Map<TableName,SpaceViolationPolicy> enforcedPolicies; + private SpaceQuotaRefresherChore spaceQuotaRefresher; + private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots; private boolean started = false; + private ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies; + private SpaceViolationPolicyEnforcementFactory factory; public RegionServerSpaceQuotaManager(RegionServerServices rsServices) { + this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance()); + } + + @VisibleForTesting + RegionServerSpaceQuotaManager( + RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) { this.rsServices = Objects.requireNonNull(rsServices); + this.factory = factory; + this.enforcedPolicies = new ConcurrentHashMap<>(); + this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>()); } public synchronized void start() throws IOException { @@ -59,8 +75,12 @@ public class RegionServerSpaceQuotaManager { return; } - spaceQuotaRefresher = new SpaceQuotaViolationPolicyRefresherChore(this); - enforcedPolicies = new HashMap<>(); + if (started) { + LOG.warn("RegionServerSpaceQuotaManager has already been started!"); + return; + } + this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection()); + rsServices.getChoreService().scheduleChore(spaceQuotaRefresher); started = true; } @@ -79,91 +99,136 @@ public class RegionServerSpaceQuotaManager { return started; } - Connection getConnection() { - return rsServices.getConnection(); + /** + * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view + * of what the RegionServer thinks the table's utilization is. + */ + public Map<TableName,SpaceQuotaSnapshot> copyQuotaSnapshots() { + return new HashMap<>(currentQuotaSnapshots.get()); } /** - * Returns the collection of tables which have quota violation policies enforced on - * this RegionServer. + * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer. + * + * @param newSnapshots The space quota snapshots. */ - public synchronized Map<TableName,SpaceViolationPolicy> getActiveViolationPolicyEnforcements() - throws IOException { - return new HashMap<>(this.enforcedPolicies); + public void updateQuotaSnapshot(Map<TableName,SpaceQuotaSnapshot> newSnapshots) { + currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots)); } /** - * Wrapper around {@link QuotaTableUtil#extractViolationPolicy(Result, Map)} for testing. + * Creates an object well-suited for the RegionServer to use in verifying active policies. */ - void extractViolationPolicy(Result result, Map<TableName,SpaceViolationPolicy> activePolicies) { - QuotaTableUtil.extractViolationPolicy(result, activePolicies); + public ActivePolicyEnforcement getActiveEnforcements() { + return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices); } /** - * Reads all quota violation policies which are to be enforced from the quota table. - * - * @return The collection of tables which are in violation of their quota and the policy which - * should be enforced. + * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into + * {@link SpaceViolationPolicy}s. */ - public Map<TableName, SpaceViolationPolicy> getViolationPoliciesToEnforce() throws IOException { - try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME); - ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan())) { - Map<TableName,SpaceViolationPolicy> activePolicies = new HashMap<>(); - for (Result result : scanner) { - try { - extractViolationPolicy(result, activePolicies); - } catch (IllegalArgumentException e) { - final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow()); - LOG.error(msg, e); - throw new IOException(msg, e); - } + public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() { + final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = + copyActiveEnforcements(); + final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>(); + for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) { + final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot(); + if (null != snapshot) { + policies.put(entry.getKey(), snapshot); } - return activePolicies; } + return policies; } /** * Enforces the given violationPolicy on the given table in this RegionServer. */ - synchronized void enforceViolationPolicy( - TableName tableName, SpaceViolationPolicy violationPolicy) { + public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) { + SpaceQuotaStatus status = snapshot.getQuotaStatus(); + if (!status.isInViolation()) { + throw new IllegalStateException( + tableName + " is not in violation. Violation policy should not be enabled."); + } if (LOG.isTraceEnabled()) { LOG.trace( "Enabling violation policy enforcement on " + tableName - + " with policy " + violationPolicy); + + " with policy " + status.getPolicy()); + } + // Construct this outside of the lock + final SpaceViolationPolicyEnforcement enforcement = getFactory().create( + getRegionServerServices(), tableName, snapshot); + // "Enables" the policy + // TODO Should this synchronize on the actual table name instead of the map? That would allow + // policy enable/disable on different tables to happen concurrently. As written now, only one + // table will be allowed to transition at a time. + synchronized (enforcedPolicies) { + try { + enforcement.enable(); + } catch (IOException e) { + LOG.error("Failed to enable space violation policy for " + tableName + + ". This table will not enter violation.", e); + return; + } + enforcedPolicies.put(tableName, enforcement); } - // Enact the policy - enforceOnRegionServer(tableName, violationPolicy); - // Publicize our enacting of the policy - enforcedPolicies.put(tableName, violationPolicy); } /** - * Enacts the given violation policy on this table in the RegionServer. + * Disables enforcement on any violation policy on the given <code>tableName</code>. */ - void enforceOnRegionServer(TableName tableName, SpaceViolationPolicy violationPolicy) { - throw new UnsupportedOperationException("TODO"); + public void disableViolationPolicyEnforcement(TableName tableName) { + if (LOG.isTraceEnabled()) { + LOG.trace("Disabling violation policy enforcement on " + tableName); + } + // "Disables" the policy + // TODO Should this synchronize on the actual table name instead of the map? + synchronized (enforcedPolicies) { + SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName); + if (null != enforcement) { + try { + enforcement.disable(); + } catch (IOException e) { + LOG.error("Failed to disable space violation policy for " + tableName + + ". This table will remain in violation.", e); + enforcedPolicies.put(tableName, enforcement); + } + } + } } /** - * Disables enforcement on any violation policy on the given <code>tableName</code>. + * Returns whether or not compactions should be disabled for the given <code>tableName</code> per + * a space quota violation policy. A convenience method. + * + * @param tableName The table to check + * @return True if compactions should be disabled for the table, false otherwise. */ - synchronized void disableViolationPolicyEnforcement(TableName tableName) { - if (LOG.isTraceEnabled()) { - LOG.trace("Disabling violation policy enforcement on " + tableName); + public boolean areCompactionsDisabled(TableName tableName) { + SpaceViolationPolicyEnforcement enforcement = this.enforcedPolicies.get(Objects.requireNonNull(tableName)); + if (null != enforcement) { + return enforcement.areCompactionsDisabled(); } - disableOnRegionServer(tableName); - enforcedPolicies.remove(tableName); + return false; } /** - * Disables any violation policy on this table in the RegionServer. + * Returns the collection of tables which have quota violation policies enforced on + * this RegionServer. */ - void disableOnRegionServer(TableName tableName) { - throw new UnsupportedOperationException("TODO"); + Map<TableName,SpaceViolationPolicyEnforcement> copyActiveEnforcements() { + // Allows reads to happen concurrently (or while the map is being updated) + return new HashMap<>(this.enforcedPolicies); } RegionServerServices getRegionServerServices() { return rsServices; } + + Connection getConnection() { + return rsServices.getConnection(); + } + + SpaceViolationPolicyEnforcementFactory getFactory() { + return factory; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java new file mode 100644 index 0000000..904903f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceLimitingException.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * An Exception that is thrown when a space quota is in violation. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class SpaceLimitingException extends QuotaExceededException { + private static final long serialVersionUID = 2319438922387583600L; + private static final Log LOG = LogFactory.getLog(SpaceLimitingException.class); + private static final String MESSAGE_PREFIX = SpaceLimitingException.class.getName() + ": "; + + private final String policyName; + + public SpaceLimitingException(String msg) { + super(parseMessage(msg)); + + // Hack around ResponseConverter expecting to invoke a single-arg String constructor + // on this class + if (null != msg) { + for (SpaceViolationPolicy definedPolicy : SpaceViolationPolicy.values()) { + if (msg.indexOf(definedPolicy.name()) != -1) { + policyName = definedPolicy.name(); + return; + } + } + } + policyName = null; + } + + public SpaceLimitingException(String policyName, String msg) { + super(msg); + this.policyName = policyName; + } + + public SpaceLimitingException(String policyName, String msg, Throwable e) { + super(msg, e); + this.policyName = policyName; + } + + /** + * Returns the violation policy in effect. + * + * @return The violation policy in effect. + */ + public String getViolationPolicy() { + return this.policyName; + } + + private static String parseMessage(String originalMessage) { + // Serialization of the exception places a duplicate class name. Try to strip that off if it + // exists. Best effort... Looks something like: + // "org.apache.hadoop.hbase.quotas.SpaceLimitingException: NO_INSERTS A Put is disallowed due + // to a space quota." + if (null != originalMessage && originalMessage.startsWith(MESSAGE_PREFIX)) { + // If it starts with the class name, rip off the policy too. + try { + int index = originalMessage.indexOf(' ', MESSAGE_PREFIX.length()); + return originalMessage.substring(index + 1); + } catch (Exception e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Failed to trim exception message", e); + } + } + } + return originalMessage; + } + + @Override + public String getMessage() { + return (null == policyName ? "(unknown policy)" : policyName) + " " + super.getMessage(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java new file mode 100644 index 0000000..e1a2693 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaRefresherChore.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager} + * with information from the hbase:quota. + */ +@InterfaceAudience.Private +public class SpaceQuotaRefresherChore extends ScheduledChore { + private static final Log LOG = LogFactory.getLog(SpaceQuotaRefresherChore.class); + + static final String POLICY_REFRESHER_CHORE_PERIOD_KEY = + "hbase.regionserver.quotas.policy.refresher.chore.period"; + static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis + + static final String POLICY_REFRESHER_CHORE_DELAY_KEY = + "hbase.regionserver.quotas.policy.refresher.chore.delay"; + static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute + + static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY = + "hbase.regionserver.quotas.policy.refresher.chore.timeunit"; + static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); + + static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY = + "hbase.regionserver.quotas.policy.refresher.report.percent"; + static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95; + + private final RegionServerSpaceQuotaManager manager; + private final Connection conn; + + public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) { + super(SpaceQuotaRefresherChore.class.getSimpleName(), + manager.getRegionServerServices(), + getPeriod(manager.getRegionServerServices().getConfiguration()), + getInitialDelay(manager.getRegionServerServices().getConfiguration()), + getTimeUnit(manager.getRegionServerServices().getConfiguration())); + this.manager = manager; + this.conn = conn; + } + + @Override + protected void chore() { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Reading current quota snapshots from hbase:quota."); + } + // Get the snapshots that the quota manager is currently aware of + final Map<TableName, SpaceQuotaSnapshot> currentSnapshots = + getManager().copyQuotaSnapshots(); + // Read the new snapshots from the quota table + final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable(); + if (LOG.isTraceEnabled()) { + LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, " + + "read " + newSnapshots.size() + " from the quota table."); + } + // Iterate over each new quota snapshot + for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) { + final TableName tableName = entry.getKey(); + final SpaceQuotaSnapshot newSnapshot = entry.getValue(); + // May be null! + final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName); + if (LOG.isTraceEnabled()) { + LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot); + } + if (!newSnapshot.equals(currentSnapshot)) { + // We have a new snapshot. We might need to enforce it or disable the enforcement + if (!isInViolation(currentSnapshot) && newSnapshot.getQuotaStatus().isInViolation()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Enabling " + newSnapshot + " on " + tableName); + } + getManager().enforceViolationPolicy(tableName, newSnapshot); + } + if (isInViolation(currentSnapshot) && !newSnapshot.getQuotaStatus().isInViolation()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Removing quota violation policy on " + tableName); + } + getManager().disableViolationPolicyEnforcement(tableName); + } + } + } + + // We're intentionally ignoring anything extra with the currentSnapshots. If we were missing + // information from the RegionServers to create an accurate SpaceQuotaSnapshot in the Master, + // the Master will generate a new SpaceQuotaSnapshot which represents this state. This lets + // us avoid having to do anything special with currentSnapshots here. + + // Update the snapshots in the manager + getManager().updateQuotaSnapshot(newSnapshots); + } catch (IOException e) { + LOG.warn( + "Caught exception while refreshing enforced quota violation policies, will retry.", e); + } + } + + /** + * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null. + * If the snapshot is null, this is interpreted as no snapshot which implies not in violation. + * + * @param snapshot The snapshot to operate on. + * @return true if the snapshot is in violation, false otherwise. + */ + boolean isInViolation(SpaceQuotaSnapshot snapshot) { + if (null == snapshot) { + return false; + } + return snapshot.getQuotaStatus().isInViolation(); + } + + /** + * Reads all quota snapshots from the quota table. + * + * @return The current "view" of space use by each table. + */ + public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException { + try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME); + ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) { + Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>(); + for (Result result : scanner) { + try { + extractQuotaSnapshot(result, snapshots); + } catch (IllegalArgumentException e) { + final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow()); + LOG.error(msg, e); + throw new IOException(msg, e); + } + } + return snapshots; + } + } + + /** + * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing. + */ + void extractQuotaSnapshot(Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) { + QuotaTableUtil.extractQuotaSnapshot(result, snapshots); + } + + Connection getConnection() { + return conn; + } + + RegionServerSpaceQuotaManager getManager() { + return manager; + } + + /** + * Extracts the period for the chore from the configuration. + * + * @param conf The configuration object. + * @return The configured chore period or the default value. + */ + static int getPeriod(Configuration conf) { + return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY, + POLICY_REFRESHER_CHORE_PERIOD_DEFAULT); + } + + /** + * Extracts the initial delay for the chore from the configuration. + * + * @param conf The configuration object. + * @return The configured chore initial delay or the default value. + */ + static long getInitialDelay(Configuration conf) { + return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY, + POLICY_REFRESHER_CHORE_DELAY_DEFAULT); + } + + /** + * Extracts the time unit for the chore period and initial delay from the configuration. The + * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to + * a {@link TimeUnit} value. + * + * @param conf The configuration object. + * @return The configured time unit for the chore period and initial delay or the default value. + */ + static TimeUnit getTimeUnit(Configuration conf) { + return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY, + POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT)); + } + + /** + * Extracts the percent of Regions for a table to have been reported to enable quota violation + * state change. + * + * @param conf The configuration object. + * @return The percent of regions reported to use. + */ + static Double getRegionReportPercent(Configuration conf) { + return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY, + POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java new file mode 100644 index 0000000..46e93c0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifier.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; + +/** + * An interface which abstract away the action taken to enable or disable + * a space quota violation policy across the HBase cluster. Implementations + * must have a no-args constructor. + */ +@InterfaceAudience.Private +public interface SpaceQuotaSnapshotNotifier { + + /** + * Initializes the notifier. + */ + void initialize(Connection conn); + + /** + * Informs the cluster of the current state of a space quota for a table. + * + * @param tableName The name of the table. + * @param snapshot The details of the space quota utilization. + */ + void transitionTable(TableName tableName, SpaceQuotaSnapshot snapshot) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java new file mode 100644 index 0000000..cb34529 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.util.Objects; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Factory for creating {@link SpaceQuotaSnapshotNotifier} implementations. Implementations + * must have a no-args constructor. + */ +@InterfaceAudience.Private +public class SpaceQuotaSnapshotNotifierFactory { + private static final SpaceQuotaSnapshotNotifierFactory INSTANCE = + new SpaceQuotaSnapshotNotifierFactory(); + + public static final String SNAPSHOT_NOTIFIER_KEY = "hbase.master.quota.snapshot.notifier.impl"; + public static final Class<? extends SpaceQuotaSnapshotNotifier> SNAPSHOT_NOTIFIER_DEFAULT = + TableSpaceQuotaSnapshotNotifier.class; + + // Private + private SpaceQuotaSnapshotNotifierFactory() {} + + public static SpaceQuotaSnapshotNotifierFactory getInstance() { + return INSTANCE; + } + + /** + * Instantiates the {@link SpaceQuotaSnapshotNotifier} implementation as defined in the + * configuration provided. + * + * @param conf Configuration object + * @return The SpaceQuotaSnapshotNotifier implementation + * @throws IllegalArgumentException if the class could not be instantiated + */ + public SpaceQuotaSnapshotNotifier create(Configuration conf) { + Class<? extends SpaceQuotaSnapshotNotifier> clz = Objects.requireNonNull(conf) + .getClass(SNAPSHOT_NOTIFIER_KEY, SNAPSHOT_NOTIFIER_DEFAULT, + SpaceQuotaSnapshotNotifier.class); + try { + return clz.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException("Failed to instantiate the implementation", e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java deleted file mode 100644 index 261dea7..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.quotas; - -import java.io.IOException; - -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; - -/** - * An interface which abstract away the action taken to enable or disable - * a space quota violation policy across the HBase cluster. Implementations - * must have a no-args constructor. - */ -@InterfaceAudience.Private -public interface SpaceQuotaViolationNotifier { - - /** - * Initializes the notifier. - */ - void initialize(Connection conn); - - /** - * Instructs the cluster that the given table is in violation of a space quota. The - * provided violation policy is the action which should be taken on the table. - * - * @param tableName The name of the table in violation of the quota. - * @param violationPolicy The policy which should be enacted on the table. - */ - void transitionTableToViolation( - TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException; - - /** - * Instructs the cluster that the given table is in observance of any applicable space quota. - * - * @param tableName The name of the table in observance. - */ - void transitionTableToObservance(TableName tableName) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java deleted file mode 100644 index 43f5513..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.quotas; - -import java.util.Objects; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * Factory for creating {@link SpaceQuotaViolationNotifier} implementations. Implementations - * must have a no-args constructor. - */ -@InterfaceAudience.Private -public class SpaceQuotaViolationNotifierFactory { - private static final SpaceQuotaViolationNotifierFactory INSTANCE = - new SpaceQuotaViolationNotifierFactory(); - - public static final String VIOLATION_NOTIFIER_KEY = "hbase.master.quota.violation.notifier.impl"; - public static final Class<? extends SpaceQuotaViolationNotifier> VIOLATION_NOTIFIER_DEFAULT = - SpaceQuotaViolationNotifierForTest.class; - - // Private - private SpaceQuotaViolationNotifierFactory() {} - - public static SpaceQuotaViolationNotifierFactory getInstance() { - return INSTANCE; - } - - /** - * Instantiates the {@link SpaceQuotaViolationNotifier} implementation as defined in the - * configuration provided. - * - * @param conf Configuration object - * @return The SpaceQuotaViolationNotifier implementation - * @throws IllegalArgumentException if the class could not be instantiated - */ - public SpaceQuotaViolationNotifier create(Configuration conf) { - Class<? extends SpaceQuotaViolationNotifier> clz = Objects.requireNonNull(conf) - .getClass(VIOLATION_NOTIFIER_KEY, VIOLATION_NOTIFIER_DEFAULT, - SpaceQuotaViolationNotifier.class); - try { - return clz.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - throw new IllegalArgumentException("Failed to instantiate the implementation", e); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java deleted file mode 100644 index 65dc979..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.quotas; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; - -/** - * A SpaceQuotaViolationNotifier implementation for verifying testing. - */ -@InterfaceAudience.Private -public class SpaceQuotaViolationNotifierForTest implements SpaceQuotaViolationNotifier { - - private final Map<TableName,SpaceViolationPolicy> tablesInViolation = new HashMap<>(); - - @Override - public void initialize(Connection conn) {} - - @Override - public void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy) { - tablesInViolation.put(tableName, violationPolicy); - } - - @Override - public void transitionTableToObservance(TableName tableName) { - tablesInViolation.remove(tableName); - } - - public Map<TableName,SpaceViolationPolicy> snapshotTablesInViolation() { - return new HashMap<>(this.tablesInViolation); - } - - public void clearTableViolations() { - this.tablesInViolation.clear(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java deleted file mode 100644 index 778ea0b..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.quotas; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * A {@link ScheduledChore} which periodically updates a local copy of tables which have - * space quota violation policies enacted on them. - */ -@InterfaceAudience.Private -public class SpaceQuotaViolationPolicyRefresherChore extends ScheduledChore { - private static final Log LOG = LogFactory.getLog(SpaceQuotaViolationPolicyRefresherChore.class); - - static final String POLICY_REFRESHER_CHORE_PERIOD_KEY = - "hbase.regionserver.quotas.policy.refresher.chore.period"; - static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis - - static final String POLICY_REFRESHER_CHORE_DELAY_KEY = - "hbase.regionserver.quotas.policy.refresher.chore.delay"; - static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute - - static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY = - "hbase.regionserver.quotas.policy.refresher.chore.timeunit"; - static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); - - static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY = - "hbase.regionserver.quotas.policy.refresher.report.percent"; - static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95; - - private final RegionServerSpaceQuotaManager manager; - - public SpaceQuotaViolationPolicyRefresherChore(RegionServerSpaceQuotaManager manager) { - super(SpaceQuotaViolationPolicyRefresherChore.class.getSimpleName(), - manager.getRegionServerServices(), - getPeriod(manager.getRegionServerServices().getConfiguration()), - getInitialDelay(manager.getRegionServerServices().getConfiguration()), - getTimeUnit(manager.getRegionServerServices().getConfiguration())); - this.manager = manager; - } - - @Override - protected void chore() { - // Tables with a policy currently enforced - final Map<TableName, SpaceViolationPolicy> activeViolationPolicies; - // Tables with policies that should be enforced - final Map<TableName, SpaceViolationPolicy> violationPolicies; - try { - // Tables with a policy currently enforced - activeViolationPolicies = manager.getActiveViolationPolicyEnforcements(); - // Tables with policies that should be enforced - violationPolicies = manager.getViolationPoliciesToEnforce(); - } catch (IOException e) { - LOG.warn("Failed to fetch enforced quota violation policies, will retry.", e); - return; - } - // Ensure each policy which should be enacted is enacted. - for (Entry<TableName, SpaceViolationPolicy> entry : violationPolicies.entrySet()) { - final TableName tableName = entry.getKey(); - final SpaceViolationPolicy policyToEnforce = entry.getValue(); - final SpaceViolationPolicy currentPolicy = activeViolationPolicies.get(tableName); - if (currentPolicy != policyToEnforce) { - if (LOG.isTraceEnabled()) { - LOG.trace("Enabling " + policyToEnforce + " on " + tableName); - } - manager.enforceViolationPolicy(tableName, policyToEnforce); - } - } - // Remove policies which should no longer be enforced - Iterator<TableName> iter = activeViolationPolicies.keySet().iterator(); - while (iter.hasNext()) { - final TableName localTableWithPolicy = iter.next(); - if (!violationPolicies.containsKey(localTableWithPolicy)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Removing quota violation policy on " + localTableWithPolicy); - } - manager.disableViolationPolicyEnforcement(localTableWithPolicy); - iter.remove(); - } - } - } - - /** - * Extracts the period for the chore from the configuration. - * - * @param conf The configuration object. - * @return The configured chore period or the default value. - */ - static int getPeriod(Configuration conf) { - return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY, - POLICY_REFRESHER_CHORE_PERIOD_DEFAULT); - } - - /** - * Extracts the initial delay for the chore from the configuration. - * - * @param conf The configuration object. - * @return The configured chore initial delay or the default value. - */ - static long getInitialDelay(Configuration conf) { - return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY, - POLICY_REFRESHER_CHORE_DELAY_DEFAULT); - } - - /** - * Extracts the time unit for the chore period and initial delay from the configuration. The - * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to - * a {@link TimeUnit} value. - * - * @param conf The configuration object. - * @return The configured time unit for the chore period and initial delay or the default value. - */ - static TimeUnit getTimeUnit(Configuration conf) { - return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY, - POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT)); - } - - /** - * Extracts the percent of Regions for a table to have been reported to enable quota violation - * state change. - * - * @param conf The configuration object. - * @return The percent of regions reported to use. - */ - static Double getRegionReportPercent(Configuration conf) { - return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY, - POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/34ba143f/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java new file mode 100644 index 0000000..34b88e5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +/** + * RegionServer implementation of {@link SpaceViolationPolicy}. + * + * Implementations must have a public, no-args constructor. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface SpaceViolationPolicyEnforcement { + + /** + * Initializes this policy instance. + */ + void initialize(RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot); + + /** + * Enables this policy. Not all policies have enable actions. + */ + void enable() throws IOException; + + /** + * Disables this policy. Not all policies have disable actions. + */ + void disable() throws IOException; + + /** + * Checks the given {@link Mutation} against <code>this</code> policy. If the + * {@link Mutation} violates the policy, this policy should throw a + * {@link SpaceLimitingException}. + * + * @throws SpaceLimitingException When the given mutation violates this policy. + */ + void check(Mutation m) throws SpaceLimitingException; + + /** + * Returns a logical name for the {@link SpaceViolationPolicy} that this enforcement is for. + */ + String getPolicyName(); + + /** + * Returns whether or not compactions on this table should be disabled for this policy. + */ + boolean areCompactionsDisabled(); + + /** + * Returns the {@link SpaceQuotaSnapshot} <code>this</code> was initialized with. + */ + SpaceQuotaSnapshot getQuotaSnapshot(); + + /** + * Returns whether thet caller should verify any bulk loads against <code>this</code>. + */ + boolean shouldCheckBulkLoads(); + + /** + * Checks the file at the given path against <code>this</code> policy and the current + * {@link SpaceQuotaSnapshot}. If the file would violate the policy, a + * {@link SpaceLimitingException} will be thrown. + * + * @param paths The paths in HDFS to files to be bulk loaded. + */ + void checkBulkLoad(FileSystem fs, List<String> paths) throws SpaceLimitingException; + +}