HBASE-16999 Implement master and regionserver synchronization of quota state
* Implement the RegionServer reading violation from the quota table * Implement the Master reporting violations to the quota table * RegionServers need to track its enforced policies Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/787e6c50 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/787e6c50 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/787e6c50 Branch: refs/heads/HBASE-16961 Commit: 787e6c50af026ae8211b1553339b169f61c6ffac Parents: a00bd18 Author: Josh Elser <els...@apache.org> Authored: Fri Nov 18 15:38:19 2016 -0500 Committer: Josh Elser <els...@apache.org> Committed: Mon Feb 6 13:56:40 2017 -0500 ---------------------------------------------------------------------- .../hadoop/hbase/quotas/QuotaTableUtil.java | 92 ++++++++- .../org/apache/hadoop/hbase/master/HMaster.java | 35 +++- .../hadoop/hbase/quotas/QuotaObserverChore.java | 5 +- .../hbase/quotas/RegionServerQuotaManager.java | 200 ------------------- .../quotas/RegionServerRpcQuotaManager.java | 200 +++++++++++++++++++ .../quotas/RegionServerSpaceQuotaManager.java | 169 ++++++++++++++++ .../quotas/SpaceQuotaViolationNotifier.java | 16 +- .../SpaceQuotaViolationNotifierFactory.java | 62 ++++++ .../SpaceQuotaViolationNotifierForTest.java | 4 + ...SpaceQuotaViolationPolicyRefresherChore.java | 154 ++++++++++++++ .../TableSpaceQuotaViolationNotifier.java | 55 +++++ .../hbase/regionserver/HRegionServer.java | 21 +- .../hbase/regionserver/RSRpcServices.java | 7 +- .../regionserver/RegionServerServices.java | 12 +- .../hadoop/hbase/MockRegionServerServices.java | 10 +- .../hadoop/hbase/master/MockRegionServer.java | 10 +- .../TestQuotaObserverChoreWithMiniCluster.java | 2 + .../hadoop/hbase/quotas/TestQuotaTableUtil.java | 49 +++++ .../hadoop/hbase/quotas/TestQuotaThrottle.java | 4 +- .../TestRegionServerSpaceQuotaManager.java | 127 ++++++++++++ ...SpaceQuotaViolationPolicyRefresherChore.java | 131 ++++++++++++ .../TestTableSpaceQuotaViolationNotifier.java | 144 +++++++++++++ 22 files changed, 1283 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java index 1640ddc..505e94b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java @@ -23,16 +23,20 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -43,7 +47,12 @@ import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; @@ -52,9 +61,8 @@ import org.apache.hadoop.hbase.util.Strings; * <pre> * ROW-KEY FAM/QUAL DATA * n.<namespace> q:s <global-quotas> - * n.<namespace> u:du <size in bytes> * t.<table> q:s <global-quotas> - * t.<table> u:du <size in bytes> + * t.<table> u:v <space violation policy> * u.<user> q:s <global-quotas> * u.<user> q:s.<table> <table-quotas> * u.<user> q:s.<ns>: <namespace-quotas> @@ -73,7 +81,7 @@ public class QuotaTableUtil { protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u"); protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s"); protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s."); - protected static final byte[] QUOTA_QUALIFIER_DISKUSAGE = Bytes.toBytes("du"); + protected static final byte[] QUOTA_QUALIFIER_VIOLATION = Bytes.toBytes("v"); protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u."); protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t."); protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n."); @@ -202,6 +210,51 @@ public class QuotaTableUtil { return filterList; } + /** + * Creates a {@link Scan} which returns only quota violations from the quota table. + */ + public static Scan makeQuotaViolationScan() { + Scan s = new Scan(); + // Limit to "u:v" column + s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION); + // Limit rowspace to the "t:" prefix + s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX); + return s; + } + + /** + * Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided + * {@link Result} and adds them to the given {@link Map}. If the result does not contain + * the expected information or the serialized policy in the value is invalid, this method + * will throw an {@link IllegalArgumentException}. + * + * @param result A row from the quota table. + * @param policies A map of policies to add the result of this method into. + */ + public static void extractViolationPolicy( + Result result, Map<TableName,SpaceViolationPolicy> policies) { + byte[] row = Objects.requireNonNull(result).getRow(); + if (null == row) { + throw new IllegalArgumentException("Provided result had a null row"); + } + final TableName targetTableName = getTableFromRowKey(row); + Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION); + if (null == c) { + throw new IllegalArgumentException("Result did not contain the expected column " + + Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_VIOLATION) + + ", " + result.toString()); + } + ByteString buffer = UnsafeByteOperations.unsafeWrap( + c.getValueArray(), c.getValueOffset(), c.getValueLength()); + try { + SpaceQuota quota = SpaceQuota.parseFrom(buffer); + policies.put(targetTableName, getViolationPolicy(quota)); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException( + "Result did not contain a valid SpaceQuota protocol buffer message", e); + } + } + public static interface UserQuotasVisitor { void visitUserQuotas(final String userName, final Quotas quotas) throws IOException; @@ -297,6 +350,26 @@ public class QuotaTableUtil { } } + /** + * Creates a {@link Put} to enable the given <code>policy</code> on the <code>table</code>. + */ + public static Put createEnableViolationPolicyUpdate( + TableName tableName, SpaceViolationPolicy policy) { + Put p = new Put(getTableRowKey(tableName)); + SpaceQuota quota = getProtoViolationPolicy(policy); + p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION, quota.toByteArray()); + return p; + } + + /** + * Creates a {@link Delete} to remove a policy on the given <code>table</code>. + */ + public static Delete createRemoveViolationPolicyUpdate(TableName tableName) { + Delete d = new Delete(getTableRowKey(tableName)); + d.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION); + return d; + } + /* ========================================================================= * Quotas protobuf helpers */ @@ -418,4 +491,17 @@ public class QuotaTableUtil { protected static String getUserFromRowKey(final byte[] key) { return Bytes.toString(key, QUOTA_USER_ROW_KEY_PREFIX.length); } + + protected static SpaceQuota getProtoViolationPolicy(SpaceViolationPolicy policy) { + return SpaceQuota.newBuilder() + .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy)) + .build(); + } + + protected static SpaceViolationPolicy getViolationPolicy(SpaceQuota proto) { + if (!proto.hasViolationPolicy()) { + throw new IllegalArgumentException("Protobuf SpaceQuota does not have violation policy."); + } + return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 616c22d..411c33d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -132,8 +132,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.quotas.QuotaObserverChore; +import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifier; -import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierForTest; +import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierFactory; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; @@ -149,11 +150,14 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.master.TableCFsUpdater; import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -861,7 +865,7 @@ public class HMaster extends HRegionServer implements MasterServices { status.setStatus("Starting quota manager"); initQuotaManager(); - this.spaceQuotaViolationNotifier = new SpaceQuotaViolationNotifierForTest(); + this.spaceQuotaViolationNotifier = createQuotaViolationNotifier(); this.quotaObserverChore = new QuotaObserverChore(this); // Start the chore to read the region FS space reports and act on them getChoreService().scheduleChore(quotaObserverChore); @@ -952,6 +956,13 @@ public class HMaster extends HRegionServer implements MasterServices { this.quotaManager = quotaManager; } + SpaceQuotaViolationNotifier createQuotaViolationNotifier() { + SpaceQuotaViolationNotifier notifier = + SpaceQuotaViolationNotifierFactory.getInstance().create(getConfiguration()); + notifier.initialize(getClusterConnection()); + return notifier; + } + boolean isCatalogJanitorEnabled() { return catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false; @@ -2129,6 +2140,26 @@ public class HMaster extends HRegionServer implements MasterServices { protected void run() throws IOException { getMaster().getMasterCoprocessorHost().preEnableTable(tableName); + // Normally, it would make sense for this authorization check to exist inside + // AccessController, but because the authorization check is done based on internal state + // (rather than explicit permissions) we'll do the check here instead of in the + // coprocessor. + MasterQuotaManager quotaManager = getMasterQuotaManager(); + if (null != quotaManager) { + if (quotaManager.isQuotaEnabled()) { + Quotas quotaForTable = QuotaUtil.getTableQuota(getConnection(), tableName); + if (null != quotaForTable && quotaForTable.hasSpace()) { + SpaceViolationPolicy policy = quotaForTable.getSpace().getViolationPolicy(); + if (SpaceViolationPolicy.DISABLE == policy) { + throw new AccessDeniedException("Enabling the table '" + tableName + + "' is disallowed due to a violated space quota."); + } + } + } else if (LOG.isTraceEnabled()) { + LOG.trace("Unable to check for space quotas as the MasterQuotaManager is not enabled"); + } + } + LOG.info(getClientIdAuditPrefix() + " enable " + tableName); // Execute the operation asynchronously - client will check the progress of the operation http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java index 88a6149..8b127d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java @@ -352,14 +352,15 @@ public class QuotaObserverChore extends ScheduledChore { /** * Transitions the given table to violation of its quota, enabling the violation policy. */ - private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy) { + private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy) + throws IOException { this.violationNotifier.transitionTableToViolation(table, violationPolicy); } /** * Transitions the given table to observance of its quota, disabling the violation policy. */ - private void transitionTableToObservance(TableName table) { + private void transitionTableToObservance(TableName table) throws IOException { this.violationNotifier.transitionTableToObservance(table); } http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java deleted file mode 100644 index 4961e06..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.quotas; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ipc.RpcScheduler; -import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.security.UserGroupInformation; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Region Server Quota Manager. - * It is responsible to provide access to the quota information of each user/table. - * - * The direct user of this class is the RegionServer that will get and check the - * user/table quota for each operation (put, get, scan). - * For system tables and user/table with a quota specified, the quota check will be a noop. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class RegionServerQuotaManager { - private static final Log LOG = LogFactory.getLog(RegionServerQuotaManager.class); - - private final RegionServerServices rsServices; - - private QuotaCache quotaCache = null; - - public RegionServerQuotaManager(final RegionServerServices rsServices) { - this.rsServices = rsServices; - } - - public void start(final RpcScheduler rpcScheduler) throws IOException { - if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { - LOG.info("Quota support disabled"); - return; - } - - LOG.info("Initializing quota support"); - - // Initialize quota cache - quotaCache = new QuotaCache(rsServices); - quotaCache.start(); - } - - public void stop() { - if (isQuotaEnabled()) { - quotaCache.stop("shutdown"); - } - } - - public boolean isQuotaEnabled() { - return quotaCache != null; - } - - @VisibleForTesting - QuotaCache getQuotaCache() { - return quotaCache; - } - - /** - * Returns the quota for an operation. - * - * @param ugi the user that is executing the operation - * @param table the table where the operation will be executed - * @return the OperationQuota - */ - public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) { - if (isQuotaEnabled() && !table.isSystemTable()) { - UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); - QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); - boolean useNoop = userLimiter.isBypass(); - if (userQuotaState.hasBypassGlobals()) { - if (LOG.isTraceEnabled()) { - LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); - } - if (!useNoop) { - return new DefaultOperationQuota(userLimiter); - } - } else { - QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); - QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table); - useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass(); - if (LOG.isTraceEnabled()) { - LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + - userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter); - } - if (!useNoop) { - return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter); - } - } - } - return NoopOperationQuota.get(); - } - - /** - * Check the quota for the current (rpc-context) user. - * Returns the OperationQuota used to get the available quota and - * to report the data/usage of the operation. - * @param region the region where the operation will be performed - * @param type the operation type - * @return the OperationQuota - * @throws ThrottlingException if the operation cannot be executed due to quota exceeded. - */ - public OperationQuota checkQuota(final Region region, - final OperationQuota.OperationType type) throws IOException, ThrottlingException { - switch (type) { - case SCAN: return checkQuota(region, 0, 0, 1); - case GET: return checkQuota(region, 0, 1, 0); - case MUTATE: return checkQuota(region, 1, 0, 0); - } - throw new RuntimeException("Invalid operation type: " + type); - } - - /** - * Check the quota for the current (rpc-context) user. - * Returns the OperationQuota used to get the available quota and - * to report the data/usage of the operation. - * @param region the region where the operation will be performed - * @param actions the "multi" actions to perform - * @return the OperationQuota - * @throws ThrottlingException if the operation cannot be executed due to quota exceeded. - */ - public OperationQuota checkQuota(final Region region, - final List<ClientProtos.Action> actions) throws IOException, ThrottlingException { - int numWrites = 0; - int numReads = 0; - for (final ClientProtos.Action action: actions) { - if (action.hasMutation()) { - numWrites++; - } else if (action.hasGet()) { - numReads++; - } - } - return checkQuota(region, numWrites, numReads, 0); - } - - /** - * Check the quota for the current (rpc-context) user. - * Returns the OperationQuota used to get the available quota and - * to report the data/usage of the operation. - * @param region the region where the operation will be performed - * @param numWrites number of writes to perform - * @param numReads number of short-reads to perform - * @param numScans number of scan to perform - * @return the OperationQuota - * @throws ThrottlingException if the operation cannot be executed due to quota exceeded. - */ - private OperationQuota checkQuota(final Region region, - final int numWrites, final int numReads, final int numScans) - throws IOException, ThrottlingException { - User user = RpcServer.getRequestUser(); - UserGroupInformation ugi; - if (user != null) { - ugi = user.getUGI(); - } else { - ugi = User.getCurrent().getUGI(); - } - TableName table = region.getTableDesc().getTableName(); - - OperationQuota quota = getQuota(ugi, table); - try { - quota.checkQuota(numWrites, numReads, numScans); - } catch (ThrottlingException e) { - LOG.debug("Throttling exception for user=" + ugi.getUserName() + - " table=" + table + " numWrites=" + numWrites + - " numReads=" + numReads + " numScans=" + numScans + - ": " + e.getMessage()); - throw e; - } - return quota; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java new file mode 100644 index 0000000..756251a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Region Server Quota Manager. + * It is responsible to provide access to the quota information of each user/table. + * + * The direct user of this class is the RegionServer that will get and check the + * user/table quota for each operation (put, get, scan). + * For system tables and user/table with a quota specified, the quota check will be a noop. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RegionServerRpcQuotaManager { + private static final Log LOG = LogFactory.getLog(RegionServerRpcQuotaManager.class); + + private final RegionServerServices rsServices; + + private QuotaCache quotaCache = null; + + public RegionServerRpcQuotaManager(final RegionServerServices rsServices) { + this.rsServices = rsServices; + } + + public void start(final RpcScheduler rpcScheduler) throws IOException { + if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { + LOG.info("Quota support disabled"); + return; + } + + LOG.info("Initializing RPC quota support"); + + // Initialize quota cache + quotaCache = new QuotaCache(rsServices); + quotaCache.start(); + } + + public void stop() { + if (isQuotaEnabled()) { + quotaCache.stop("shutdown"); + } + } + + public boolean isQuotaEnabled() { + return quotaCache != null; + } + + @VisibleForTesting + QuotaCache getQuotaCache() { + return quotaCache; + } + + /** + * Returns the quota for an operation. + * + * @param ugi the user that is executing the operation + * @param table the table where the operation will be executed + * @return the OperationQuota + */ + public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) { + if (isQuotaEnabled() && !table.isSystemTable()) { + UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); + QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); + boolean useNoop = userLimiter.isBypass(); + if (userQuotaState.hasBypassGlobals()) { + if (LOG.isTraceEnabled()) { + LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); + } + if (!useNoop) { + return new DefaultOperationQuota(userLimiter); + } + } else { + QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); + QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table); + useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass(); + if (LOG.isTraceEnabled()) { + LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + + userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter); + } + if (!useNoop) { + return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter); + } + } + } + return NoopOperationQuota.get(); + } + + /** + * Check the quota for the current (rpc-context) user. + * Returns the OperationQuota used to get the available quota and + * to report the data/usage of the operation. + * @param region the region where the operation will be performed + * @param type the operation type + * @return the OperationQuota + * @throws ThrottlingException if the operation cannot be executed due to quota exceeded. + */ + public OperationQuota checkQuota(final Region region, + final OperationQuota.OperationType type) throws IOException, ThrottlingException { + switch (type) { + case SCAN: return checkQuota(region, 0, 0, 1); + case GET: return checkQuota(region, 0, 1, 0); + case MUTATE: return checkQuota(region, 1, 0, 0); + } + throw new RuntimeException("Invalid operation type: " + type); + } + + /** + * Check the quota for the current (rpc-context) user. + * Returns the OperationQuota used to get the available quota and + * to report the data/usage of the operation. + * @param region the region where the operation will be performed + * @param actions the "multi" actions to perform + * @return the OperationQuota + * @throws ThrottlingException if the operation cannot be executed due to quota exceeded. + */ + public OperationQuota checkQuota(final Region region, + final List<ClientProtos.Action> actions) throws IOException, ThrottlingException { + int numWrites = 0; + int numReads = 0; + for (final ClientProtos.Action action: actions) { + if (action.hasMutation()) { + numWrites++; + } else if (action.hasGet()) { + numReads++; + } + } + return checkQuota(region, numWrites, numReads, 0); + } + + /** + * Check the quota for the current (rpc-context) user. + * Returns the OperationQuota used to get the available quota and + * to report the data/usage of the operation. + * @param region the region where the operation will be performed + * @param numWrites number of writes to perform + * @param numReads number of short-reads to perform + * @param numScans number of scan to perform + * @return the OperationQuota + * @throws ThrottlingException if the operation cannot be executed due to quota exceeded. + */ + private OperationQuota checkQuota(final Region region, + final int numWrites, final int numReads, final int numScans) + throws IOException, ThrottlingException { + User user = RpcServer.getRequestUser(); + UserGroupInformation ugi; + if (user != null) { + ugi = user.getUGI(); + } else { + ugi = User.getCurrent().getUGI(); + } + TableName table = region.getTableDesc().getTableName(); + + OperationQuota quota = getQuota(ugi, table); + try { + quota.checkQuota(numWrites, numReads, numScans); + } catch (ThrottlingException e) { + LOG.debug("Throttling exception for user=" + ugi.getUserName() + + " table=" + table + " numWrites=" + numWrites + + " numReads=" + numReads + " numScans=" + numScans + + ": " + e.getMessage()); + throw e; + } + return quota; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java new file mode 100644 index 0000000..9a8edb9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A manager for filesystem space quotas in the RegionServer. + * + * This class is responsible for reading quota violation policies from the quota + * table and then enacting them on the given table. + */ +@InterfaceAudience.Private +public class RegionServerSpaceQuotaManager { + private static final Log LOG = LogFactory.getLog(RegionServerSpaceQuotaManager.class); + + private final RegionServerServices rsServices; + + private SpaceQuotaViolationPolicyRefresherChore spaceQuotaRefresher; + private Map<TableName,SpaceViolationPolicy> enforcedPolicies; + private boolean started = false; + + public RegionServerSpaceQuotaManager(RegionServerServices rsServices) { + this.rsServices = Objects.requireNonNull(rsServices); + } + + public synchronized void start() throws IOException { + if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { + LOG.info("Quota support disabled, not starting space quota manager."); + return; + } + + spaceQuotaRefresher = new SpaceQuotaViolationPolicyRefresherChore(this); + enforcedPolicies = new HashMap<>(); + started = true; + } + + public synchronized void stop() { + if (null != spaceQuotaRefresher) { + spaceQuotaRefresher.cancel(); + spaceQuotaRefresher = null; + } + started = false; + } + + /** + * @return if the {@code Chore} has been started. + */ + public boolean isStarted() { + return started; + } + + Connection getConnection() { + return rsServices.getConnection(); + } + + /** + * Returns the collection of tables which have quota violation policies enforced on + * this RegionServer. + */ + public synchronized Map<TableName,SpaceViolationPolicy> getActiveViolationPolicyEnforcements() + throws IOException { + return new HashMap<>(this.enforcedPolicies); + } + + /** + * Wrapper around {@link QuotaTableUtil#extractViolationPolicy(Result, Map)} for testing. + */ + void extractViolationPolicy(Result result, Map<TableName,SpaceViolationPolicy> activePolicies) { + QuotaTableUtil.extractViolationPolicy(result, activePolicies); + } + + /** + * Reads all quota violation policies which are to be enforced from the quota table. + * + * @return The collection of tables which are in violation of their quota and the policy which + * should be enforced. + */ + public Map<TableName, SpaceViolationPolicy> getViolationPoliciesToEnforce() throws IOException { + try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME); + ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan())) { + Map<TableName,SpaceViolationPolicy> activePolicies = new HashMap<>(); + for (Result result : scanner) { + try { + extractViolationPolicy(result, activePolicies); + } catch (IllegalArgumentException e) { + final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow()); + LOG.error(msg, e); + throw new IOException(msg, e); + } + } + return activePolicies; + } + } + + /** + * Enforces the given violationPolicy on the given table in this RegionServer. + */ + synchronized void enforceViolationPolicy( + TableName tableName, SpaceViolationPolicy violationPolicy) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "Enabling violation policy enforcement on " + tableName + + " with policy " + violationPolicy); + } + // Enact the policy + enforceOnRegionServer(tableName, violationPolicy); + // Publicize our enacting of the policy + enforcedPolicies.put(tableName, violationPolicy); + } + + /** + * Enacts the given violation policy on this table in the RegionServer. + */ + void enforceOnRegionServer(TableName tableName, SpaceViolationPolicy violationPolicy) { + throw new UnsupportedOperationException("TODO"); + } + + /** + * Disables enforcement on any violation policy on the given <code>tableName</code>. + */ + synchronized void disableViolationPolicyEnforcement(TableName tableName) { + if (LOG.isTraceEnabled()) { + LOG.trace("Disabling violation policy enforcement on " + tableName); + } + disableOnRegionServer(tableName); + enforcedPolicies.remove(tableName); + } + + /** + * Disables any violation policy on this table in the RegionServer. + */ + void disableOnRegionServer(TableName tableName) { + throw new UnsupportedOperationException("TODO"); + } + + RegionServerServices getRegionServerServices() { + return rsServices; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java index bccf519..261dea7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java @@ -16,29 +16,39 @@ */ package org.apache.hadoop.hbase.quotas; +import java.io.IOException; + import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; /** * An interface which abstract away the action taken to enable or disable - * a space quota violation policy across the HBase cluster. + * a space quota violation policy across the HBase cluster. Implementations + * must have a no-args constructor. */ @InterfaceAudience.Private public interface SpaceQuotaViolationNotifier { /** + * Initializes the notifier. + */ + void initialize(Connection conn); + + /** * Instructs the cluster that the given table is in violation of a space quota. The * provided violation policy is the action which should be taken on the table. * * @param tableName The name of the table in violation of the quota. * @param violationPolicy The policy which should be enacted on the table. */ - void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy); + void transitionTableToViolation( + TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException; /** * Instructs the cluster that the given table is in observance of any applicable space quota. * * @param tableName The name of the table in observance. */ - void transitionTableToObservance(TableName tableName); + void transitionTableToObservance(TableName tableName) throws IOException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java new file mode 100644 index 0000000..43f5513 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.util.Objects; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Factory for creating {@link SpaceQuotaViolationNotifier} implementations. Implementations + * must have a no-args constructor. + */ +@InterfaceAudience.Private +public class SpaceQuotaViolationNotifierFactory { + private static final SpaceQuotaViolationNotifierFactory INSTANCE = + new SpaceQuotaViolationNotifierFactory(); + + public static final String VIOLATION_NOTIFIER_KEY = "hbase.master.quota.violation.notifier.impl"; + public static final Class<? extends SpaceQuotaViolationNotifier> VIOLATION_NOTIFIER_DEFAULT = + SpaceQuotaViolationNotifierForTest.class; + + // Private + private SpaceQuotaViolationNotifierFactory() {} + + public static SpaceQuotaViolationNotifierFactory getInstance() { + return INSTANCE; + } + + /** + * Instantiates the {@link SpaceQuotaViolationNotifier} implementation as defined in the + * configuration provided. + * + * @param conf Configuration object + * @return The SpaceQuotaViolationNotifier implementation + * @throws IllegalArgumentException if the class could not be instantiated + */ + public SpaceQuotaViolationNotifier create(Configuration conf) { + Class<? extends SpaceQuotaViolationNotifier> clz = Objects.requireNonNull(conf) + .getClass(VIOLATION_NOTIFIER_KEY, VIOLATION_NOTIFIER_DEFAULT, + SpaceQuotaViolationNotifier.class); + try { + return clz.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException("Failed to instantiate the implementation", e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java index 4ab9834..65dc979 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; /** * A SpaceQuotaViolationNotifier implementation for verifying testing. @@ -31,6 +32,9 @@ public class SpaceQuotaViolationNotifierForTest implements SpaceQuotaViolationNo private final Map<TableName,SpaceViolationPolicy> tablesInViolation = new HashMap<>(); @Override + public void initialize(Connection conn) {} + + @Override public void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy) { tablesInViolation.put(tableName, violationPolicy); } http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java new file mode 100644 index 0000000..778ea0b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A {@link ScheduledChore} which periodically updates a local copy of tables which have + * space quota violation policies enacted on them. + */ +@InterfaceAudience.Private +public class SpaceQuotaViolationPolicyRefresherChore extends ScheduledChore { + private static final Log LOG = LogFactory.getLog(SpaceQuotaViolationPolicyRefresherChore.class); + + static final String POLICY_REFRESHER_CHORE_PERIOD_KEY = + "hbase.regionserver.quotas.policy.refresher.chore.period"; + static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis + + static final String POLICY_REFRESHER_CHORE_DELAY_KEY = + "hbase.regionserver.quotas.policy.refresher.chore.delay"; + static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute + + static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY = + "hbase.regionserver.quotas.policy.refresher.chore.timeunit"; + static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); + + static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY = + "hbase.regionserver.quotas.policy.refresher.report.percent"; + static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95; + + private final RegionServerSpaceQuotaManager manager; + + public SpaceQuotaViolationPolicyRefresherChore(RegionServerSpaceQuotaManager manager) { + super(SpaceQuotaViolationPolicyRefresherChore.class.getSimpleName(), + manager.getRegionServerServices(), + getPeriod(manager.getRegionServerServices().getConfiguration()), + getInitialDelay(manager.getRegionServerServices().getConfiguration()), + getTimeUnit(manager.getRegionServerServices().getConfiguration())); + this.manager = manager; + } + + @Override + protected void chore() { + // Tables with a policy currently enforced + final Map<TableName, SpaceViolationPolicy> activeViolationPolicies; + // Tables with policies that should be enforced + final Map<TableName, SpaceViolationPolicy> violationPolicies; + try { + // Tables with a policy currently enforced + activeViolationPolicies = manager.getActiveViolationPolicyEnforcements(); + // Tables with policies that should be enforced + violationPolicies = manager.getViolationPoliciesToEnforce(); + } catch (IOException e) { + LOG.warn("Failed to fetch enforced quota violation policies, will retry.", e); + return; + } + // Ensure each policy which should be enacted is enacted. + for (Entry<TableName, SpaceViolationPolicy> entry : violationPolicies.entrySet()) { + final TableName tableName = entry.getKey(); + final SpaceViolationPolicy policyToEnforce = entry.getValue(); + final SpaceViolationPolicy currentPolicy = activeViolationPolicies.get(tableName); + if (currentPolicy != policyToEnforce) { + if (LOG.isTraceEnabled()) { + LOG.trace("Enabling " + policyToEnforce + " on " + tableName); + } + manager.enforceViolationPolicy(tableName, policyToEnforce); + } + } + // Remove policies which should no longer be enforced + Iterator<TableName> iter = activeViolationPolicies.keySet().iterator(); + while (iter.hasNext()) { + final TableName localTableWithPolicy = iter.next(); + if (!violationPolicies.containsKey(localTableWithPolicy)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Removing quota violation policy on " + localTableWithPolicy); + } + manager.disableViolationPolicyEnforcement(localTableWithPolicy); + iter.remove(); + } + } + } + + /** + * Extracts the period for the chore from the configuration. + * + * @param conf The configuration object. + * @return The configured chore period or the default value. + */ + static int getPeriod(Configuration conf) { + return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY, + POLICY_REFRESHER_CHORE_PERIOD_DEFAULT); + } + + /** + * Extracts the initial delay for the chore from the configuration. + * + * @param conf The configuration object. + * @return The configured chore initial delay or the default value. + */ + static long getInitialDelay(Configuration conf) { + return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY, + POLICY_REFRESHER_CHORE_DELAY_DEFAULT); + } + + /** + * Extracts the time unit for the chore period and initial delay from the configuration. The + * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to + * a {@link TimeUnit} value. + * + * @param conf The configuration object. + * @return The configured time unit for the chore period and initial delay or the default value. + */ + static TimeUnit getTimeUnit(Configuration conf) { + return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY, + POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT)); + } + + /** + * Extracts the percent of Regions for a table to have been reported to enable quota violation + * state change. + * + * @param conf The configuration object. + * @return The percent of regions reported to use. + */ + static Double getRegionReportPercent(Configuration conf) { + return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY, + POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java new file mode 100644 index 0000000..a8b1c55 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; + +/** + * A {@link SpaceQuotaViolationNotifier} which uses the hbase:quota table. + */ +public class TableSpaceQuotaViolationNotifier implements SpaceQuotaViolationNotifier { + + private Connection conn; + + @Override + public void transitionTableToViolation( + TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException { + final Put p = QuotaTableUtil.createEnableViolationPolicyUpdate(tableName, violationPolicy); + try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { + quotaTable.put(p); + } + } + + @Override + public void transitionTableToObservance(TableName tableName) throws IOException { + final Delete d = QuotaTableUtil.createRemoveViolationPolicyUpdate(tableName); + try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { + quotaTable.delete(d); + } + } + + @Override + public void initialize(Connection conn) { + this.conn = conn; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d04243c..003175f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -120,7 +120,8 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.mob.MobCacheConfig; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; -import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; +import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; @@ -475,7 +476,8 @@ public class HRegionServer extends HasThread implements private RegionServerProcedureManagerHost rspmHost; - private RegionServerQuotaManager rsQuotaManager; + private RegionServerRpcQuotaManager rsQuotaManager; + private RegionServerSpaceQuotaManager rsSpaceQuotaManager; /** * Nonce manager. Nonces are used to make operations like increment and append idempotent @@ -924,7 +926,8 @@ public class HRegionServer extends HasThread implements } // Setup the Quota Manager - rsQuotaManager = new RegionServerQuotaManager(this); + rsQuotaManager = new RegionServerRpcQuotaManager(this); + rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); this.fsUtilizationChore = new FileSystemUtilizationChore(this); @@ -998,6 +1001,7 @@ public class HRegionServer extends HasThread implements // Start the Quota Manager rsQuotaManager.start(getRpcServer().getScheduler()); + rsSpaceQuotaManager.start(); } // We registered with the Master. Go into run mode. @@ -1089,6 +1093,10 @@ public class HRegionServer extends HasThread implements if (rsQuotaManager != null) { rsQuotaManager.stop(); } + if (rsSpaceQuotaManager != null) { + rsSpaceQuotaManager.stop(); + rsSpaceQuotaManager = null; + } // Stop the snapshot and other procedure handlers, forcefully killing all running tasks if (rspmHost != null) { @@ -2853,7 +2861,7 @@ public class HRegionServer extends HasThread implements } @Override - public RegionServerQuotaManager getRegionServerQuotaManager() { + public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { return rsQuotaManager; } @@ -3711,4 +3719,9 @@ public class HRegionServer extends HasThread implements return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) .regionLock(regionInfos, description, abort); } + + @Override + public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { + return this.rsSpaceQuotaManager; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index a072dce..0b80e11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -98,7 +98,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.MasterRpcServices; import org.apache.hadoop.hbase.quotas.OperationQuota; -import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Leases.Lease; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; @@ -196,6 +196,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; +import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DNS; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -1259,8 +1260,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return regionServer.getConfiguration(); } - private RegionServerQuotaManager getQuotaManager() { - return regionServer.getRegionServerQuotaManager(); + private RegionServerRpcQuotaManager getQuotaManager() { + return regionServer.getRegionServerRpcQuotaManager(); } void start() { http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index c92124c..8eab22f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -35,7 +35,8 @@ import org.apache.hadoop.hbase.client.locking.EntityLock; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; +import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.wal.WAL; import org.apache.zookeeper.KeeperException; @@ -78,9 +79,9 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi RegionServerAccounting getRegionServerAccounting(); /** - * @return RegionServer's instance of {@link RegionServerQuotaManager} + * @return RegionServer's instance of {@link RegionServerRpcQuotaManager} */ - RegionServerQuotaManager getRegionServerQuotaManager(); + RegionServerRpcQuotaManager getRegionServerRpcQuotaManager(); /** * @return RegionServer's instance of {@link SecureBulkLoadManager} @@ -88,6 +89,11 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi SecureBulkLoadManager getSecureBulkLoadManager(); /** + * @return RegionServer's instance of {@link RegionServerSpaceQuotaManager} + */ + RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager(); + + /** * Context for postOpenDeployTasks(). */ class PostOpenDeployContext { http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 5e2a70f..8d20466 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -38,7 +38,8 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; +import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; @@ -189,7 +190,7 @@ public class MockRegionServerServices implements RegionServerServices { } @Override - public RegionServerQuotaManager getRegionServerQuotaManager() { + public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { return null; } @@ -356,4 +357,9 @@ public class MockRegionServerServices implements RegionServerServices { public SecureBulkLoadManager getSecureBulkLoadManager() { return null; } + + @Override + public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 467d4a5..94ecf09 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -101,7 +101,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; +import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -334,7 +335,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { } @Override - public RegionServerQuotaManager getRegionServerQuotaManager() { + public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { return null; } @@ -719,4 +720,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { public SecureBulkLoadManager getSecureBulkLoadManager() { return null; } + + @Override + public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java index 98236c2..c493b25 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java @@ -94,6 +94,8 @@ public class TestQuotaObserverChoreWithMiniCluster { conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_DELAY_KEY, 1000); conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_PERIOD_KEY, 1000); conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + conf.setClass(SpaceQuotaViolationNotifierFactory.VIOLATION_NOTIFIER_KEY, + SpaceQuotaViolationNotifierForTest.class, SpaceQuotaViolationNotifier.class); TEST_UTIL.startMiniCluster(1); } http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java index 5306be9..51ee514 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java @@ -21,6 +21,10 @@ package org.apache.hadoop.hbase.quotas; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -28,6 +32,10 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle; @@ -37,8 +45,10 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; /** * Test the quota table helpers (e.g. CRUD operations) @@ -48,6 +58,10 @@ public class TestQuotaTableUtil { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private Connection connection; + private int tableNameCounter; + + @Rule + public TestName testName = new TestName(); @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -70,6 +84,7 @@ public class TestQuotaTableUtil { @Before public void before() throws IOException { this.connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + this.tableNameCounter = 0; } @After @@ -179,4 +194,38 @@ public class TestQuotaTableUtil { resQuotaNS = QuotaUtil.getUserQuota(this.connection, user, namespace); assertEquals(null, resQuotaNS); } + + @Test + public void testSerDeViolationPolicies() throws Exception { + final TableName tn1 = getUniqueTableName(); + final SpaceViolationPolicy policy1 = SpaceViolationPolicy.DISABLE; + final TableName tn2 = getUniqueTableName(); + final SpaceViolationPolicy policy2 = SpaceViolationPolicy.NO_INSERTS; + final TableName tn3 = getUniqueTableName(); + final SpaceViolationPolicy policy3 = SpaceViolationPolicy.NO_WRITES; + List<Put> puts = new ArrayList<>(); + puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn1, policy1)); + puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn2, policy2)); + puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn3, policy3)); + final Map<TableName,SpaceViolationPolicy> expectedPolicies = new HashMap<>(); + expectedPolicies.put(tn1, policy1); + expectedPolicies.put(tn2, policy2); + expectedPolicies.put(tn3, policy3); + + final Map<TableName,SpaceViolationPolicy> actualPolicies = new HashMap<>(); + try (Table quotaTable = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { + quotaTable.put(puts); + ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan()); + for (Result r : scanner) { + QuotaTableUtil.extractViolationPolicy(r, actualPolicies); + } + scanner.close(); + } + + assertEquals(expectedPolicies, actualPolicies); + } + + private TableName getUniqueTableName() { + return TableName.valueOf(testName.getMethodName() + "_" + tableNameCounter++); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java index 0c06588..ffd6443 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java @@ -105,7 +105,7 @@ public class TestQuotaThrottle { @After public void tearDown() throws Exception { for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) { - RegionServerQuotaManager quotaManager = rst.getRegionServer().getRegionServerQuotaManager(); + RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager(); QuotaCache quotaCache = quotaManager.getQuotaCache(); quotaCache.getNamespaceQuotaCache().clear(); quotaCache.getTableQuotaCache().clear(); @@ -557,7 +557,7 @@ public class TestQuotaThrottle { boolean nsLimiter, final TableName... tables) throws Exception { envEdge.incValue(2 * REFRESH_TIME); for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) { - RegionServerQuotaManager quotaManager = rst.getRegionServer().getRegionServerQuotaManager(); + RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager(); QuotaCache quotaCache = quotaManager.getQuotaCache(); quotaCache.triggerCacheRefresh(); http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java new file mode 100644 index 0000000..e5ab317 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.apache.hadoop.hbase.util.Bytes.toBytes; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test class for {@link RegionServerSpaceQuotaManager}. + */ +@Category(SmallTests.class) +public class TestRegionServerSpaceQuotaManager { + + private RegionServerSpaceQuotaManager quotaManager; + private Connection conn; + private Table quotaTable; + private ResultScanner scanner; + + @Before + @SuppressWarnings("unchecked") + public void setup() throws Exception { + quotaManager = mock(RegionServerSpaceQuotaManager.class); + conn = mock(Connection.class); + quotaTable = mock(Table.class); + scanner = mock(ResultScanner.class); + // Call the real getViolationPoliciesToEnforce() + when(quotaManager.getViolationPoliciesToEnforce()).thenCallRealMethod(); + // Mock out creating a scanner + when(quotaManager.getConnection()).thenReturn(conn); + when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable); + when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner); + // Mock out the static method call with some indirection + doAnswer(new Answer<Void>(){ + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Result result = invocation.getArgumentAt(0, Result.class); + Map<TableName,SpaceViolationPolicy> policies = invocation.getArgumentAt(1, Map.class); + QuotaTableUtil.extractViolationPolicy(result, policies); + return null; + } + }).when(quotaManager).extractViolationPolicy(any(Result.class), any(Map.class)); + } + + @Test + public void testMissingAllColumns() { + List<Result> results = new ArrayList<>(); + results.add(Result.create(Collections.emptyList())); + when(scanner.iterator()).thenReturn(results.iterator()); + try { + quotaManager.getViolationPoliciesToEnforce(); + fail("Expected an IOException, but did not receive one."); + } catch (IOException e) { + // Expected an error because we had no cells in the row. + // This should only happen due to programmer error. + } + } + + @Test + public void testMissingDesiredColumn() { + List<Result> results = new ArrayList<>(); + // Give a column that isn't the one we want + Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("q"), toBytes("s"), new byte[0]); + results.add(Result.create(Collections.singletonList(c))); + when(scanner.iterator()).thenReturn(results.iterator()); + try { + quotaManager.getViolationPoliciesToEnforce(); + fail("Expected an IOException, but did not receive one."); + } catch (IOException e) { + // Expected an error because we were missing the column we expected in this row. + // This should only happen due to programmer error. + } + } + + @Test + public void testParsingError() { + List<Result> results = new ArrayList<>(); + Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("u"), toBytes("v"), new byte[0]); + results.add(Result.create(Collections.singletonList(c))); + when(scanner.iterator()).thenReturn(results.iterator()); + try { + quotaManager.getViolationPoliciesToEnforce(); + fail("Expected an IOException, but did not receive one."); + } catch (IOException e) { + // We provided a garbage serialized protobuf message (empty byte array), this should + // in turn throw an IOException + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/787e6c50/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java new file mode 100644 index 0000000..160de46 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test class for {@link SpaceQuotaViolationPolicyRefresherChore}. + */ +@Category(SmallTests.class) +public class TestSpaceQuotaViolationPolicyRefresherChore { + + private RegionServerSpaceQuotaManager manager; + private RegionServerServices rss; + private SpaceQuotaViolationPolicyRefresherChore chore; + private Configuration conf; + + @Before + public void setup() { + conf = HBaseConfiguration.create(); + rss = mock(RegionServerServices.class); + manager = mock(RegionServerSpaceQuotaManager.class); + when(manager.getRegionServerServices()).thenReturn(rss); + when(rss.getConfiguration()).thenReturn(conf); + chore = new SpaceQuotaViolationPolicyRefresherChore(manager); + } + + @Test + public void testPoliciesAreEnforced() throws IOException { + final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>(); + policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE); + policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS); + policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES); + policiesToEnforce.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES_COMPACTIONS); + + // No active enforcements + when(manager.getActiveViolationPolicyEnforcements()).thenReturn(Collections.emptyMap()); + // Policies to enforce + when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce); + + chore.chore(); + + for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) { + // Ensure we enforce the policy + verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue()); + // Don't disable any policies + verify(manager, never()).disableViolationPolicyEnforcement(entry.getKey()); + } + } + + @Test + public void testOldPoliciesAreRemoved() throws IOException { + final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>(); + policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE); + policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS); + + final Map<TableName,SpaceViolationPolicy> previousPolicies = new HashMap<>(); + previousPolicies.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES); + previousPolicies.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES); + + // No active enforcements + when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies); + // Policies to enforce + when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce); + + chore.chore(); + + for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) { + verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue()); + } + + for (Entry<TableName,SpaceViolationPolicy> entry : previousPolicies.entrySet()) { + verify(manager).disableViolationPolicyEnforcement(entry.getKey()); + } + } + + @Test + public void testNewPolicyOverridesOld() throws IOException { + final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>(); + policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE); + policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_WRITES); + policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_INSERTS); + + final Map<TableName,SpaceViolationPolicy> previousPolicies = new HashMap<>(); + previousPolicies.put(TableName.valueOf("table1"), SpaceViolationPolicy.NO_WRITES); + + // No active enforcements + when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies); + // Policies to enforce + when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce); + + chore.chore(); + + for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) { + verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue()); + } + verify(manager, never()).disableViolationPolicyEnforcement(TableName.valueOf("table1")); + } +}