http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java new file mode 100644 index 0000000..6b754b9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcementFactory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; +import org.apache.hadoop.hbase.quotas.policies.BulkLoadVerifyingViolationPolicyEnforcement; +import org.apache.hadoop.hbase.quotas.policies.DisableTableViolationPolicyEnforcement; +import org.apache.hadoop.hbase.quotas.policies.NoInsertsViolationPolicyEnforcement; +import org.apache.hadoop.hbase.quotas.policies.NoWritesCompactionsViolationPolicyEnforcement; +import org.apache.hadoop.hbase.quotas.policies.NoWritesViolationPolicyEnforcement; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +/** + * A factory class for instantiating {@link SpaceViolationPolicyEnforcement} instances. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SpaceViolationPolicyEnforcementFactory { + + private static final SpaceViolationPolicyEnforcementFactory INSTANCE = + new SpaceViolationPolicyEnforcementFactory(); + + private SpaceViolationPolicyEnforcementFactory() {} + + /** + * Returns an instance of this factory. + */ + public static SpaceViolationPolicyEnforcementFactory getInstance() { + return INSTANCE; + } + + /** + * Constructs the appropriate {@link SpaceViolationPolicyEnforcement} for tables that are + * in violation of their space quota. + */ + public SpaceViolationPolicyEnforcement create( + RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot) { + SpaceViolationPolicyEnforcement enforcement; + SpaceQuotaStatus status = snapshot.getQuotaStatus(); + if (!status.isInViolation()) { + throw new IllegalArgumentException(tableName + " is not in violation. Snapshot=" + snapshot); + } + switch (status.getPolicy()) { + case DISABLE: + enforcement = new DisableTableViolationPolicyEnforcement(); + break; + case NO_WRITES_COMPACTIONS: + enforcement = new NoWritesCompactionsViolationPolicyEnforcement(); + break; + case NO_WRITES: + enforcement = new NoWritesViolationPolicyEnforcement(); + break; + case NO_INSERTS: + enforcement = new NoInsertsViolationPolicyEnforcement(); + break; + default: + throw new IllegalArgumentException("Unhandled SpaceViolationPolicy: " + status.getPolicy()); + } + enforcement.initialize(rss, tableName, snapshot); + return enforcement; + } + + /** + * Creates the "default" {@link SpaceViolationPolicyEnforcement} for a table that isn't in + * violation. This is used to have uniform policy checking for tables in and not quotas. + */ + public SpaceViolationPolicyEnforcement createWithoutViolation( + RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot) { + SpaceQuotaStatus status = snapshot.getQuotaStatus(); + if (status.isInViolation()) { + throw new IllegalArgumentException( + tableName + " is in violation. Logic error. Snapshot=" + snapshot); + } + BulkLoadVerifyingViolationPolicyEnforcement enforcement = new BulkLoadVerifyingViolationPolicyEnforcement(); + enforcement.initialize(rss, tableName, snapshot); + return enforcement; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java new file mode 100644 index 0000000..e196354 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaSnapshotStore.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +/** + * {@link QuotaSnapshotStore} for tables. + */ +@InterfaceAudience.Private +public class TableQuotaSnapshotStore implements QuotaSnapshotStore<TableName> { + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReadLock rlock = lock.readLock(); + private final WriteLock wlock = lock.writeLock(); + + private final Connection conn; + private final QuotaObserverChore chore; + private Map<HRegionInfo,Long> regionUsage; + + public TableQuotaSnapshotStore(Connection conn, QuotaObserverChore chore, Map<HRegionInfo,Long> regionUsage) { + this.conn = Objects.requireNonNull(conn); + this.chore = Objects.requireNonNull(chore); + this.regionUsage = Objects.requireNonNull(regionUsage); + } + + @Override + public SpaceQuota getSpaceQuota(TableName subject) throws IOException { + Quotas quotas = getQuotaForTable(subject); + if (null != quotas && quotas.hasSpace()) { + return quotas.getSpace(); + } + return null; + } + /** + * Fetches the table quota. Visible for mocking/testing. + */ + Quotas getQuotaForTable(TableName table) throws IOException { + return QuotaTableUtil.getTableQuota(conn, table); + } + + @Override + public SpaceQuotaSnapshot getCurrentState(TableName table) { + // Defer the "current state" to the chore + return chore.getTableQuotaSnapshot(table); + } + + @Override + public SpaceQuotaSnapshot getTargetState(TableName table, SpaceQuota spaceQuota) { + rlock.lock(); + try { + final long sizeLimitInBytes = spaceQuota.getSoftLimit(); + long sum = 0L; + for (Entry<HRegionInfo,Long> entry : filterBySubject(table)) { + sum += entry.getValue(); + } + // Observance is defined as the size of the table being less than the limit + SpaceQuotaStatus status = sum <= sizeLimitInBytes ? SpaceQuotaStatus.notInViolation() + : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy())); + return new SpaceQuotaSnapshot(status, sum, sizeLimitInBytes); + } finally { + rlock.unlock(); + } + } + + @Override + public Iterable<Entry<HRegionInfo,Long>> filterBySubject(TableName table) { + rlock.lock(); + try { + return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<HRegionInfo,Long>>() { + @Override + public boolean apply(Entry<HRegionInfo,Long> input) { + return table.equals(input.getKey().getTable()); + } + }); + } finally { + rlock.unlock(); + } + } + + @Override + public void setCurrentState(TableName table, SpaceQuotaSnapshot snapshot) { + // Defer the "current state" to the chore + this.chore.setTableQuotaViolation(table, snapshot); + } + + @Override + public void setRegionUsage(Map<HRegionInfo,Long> regionUsage) { + wlock.lock(); + try { + this.regionUsage = Objects.requireNonNull(regionUsage); + } finally { + wlock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java deleted file mode 100644 index 6aba1cf..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.quotas; - -import java.io.IOException; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; -import java.util.Map.Entry; - -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; - -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; - -/** - * {@link QuotaViolationStore} for tables. - */ -@InterfaceAudience.Private -public class TableQuotaViolationStore implements QuotaViolationStore<TableName> { - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final ReadLock rlock = lock.readLock(); - private final WriteLock wlock = lock.writeLock(); - - private final Connection conn; - private final QuotaObserverChore chore; - private Map<HRegionInfo,Long> regionUsage; - - public TableQuotaViolationStore(Connection conn, QuotaObserverChore chore, Map<HRegionInfo,Long> regionUsage) { - this.conn = Objects.requireNonNull(conn); - this.chore = Objects.requireNonNull(chore); - this.regionUsage = Objects.requireNonNull(regionUsage); - } - - @Override - public SpaceQuota getSpaceQuota(TableName subject) throws IOException { - Quotas quotas = getQuotaForTable(subject); - if (null != quotas && quotas.hasSpace()) { - return quotas.getSpace(); - } - return null; - } - /** - * Fetches the table quota. Visible for mocking/testing. - */ - Quotas getQuotaForTable(TableName table) throws IOException { - return QuotaTableUtil.getTableQuota(conn, table); - } - - @Override - public ViolationState getCurrentState(TableName table) { - // Defer the "current state" to the chore - return chore.getTableQuotaViolation(table); - } - - @Override - public ViolationState getTargetState(TableName table, SpaceQuota spaceQuota) { - rlock.lock(); - try { - final long sizeLimitInBytes = spaceQuota.getSoftLimit(); - long sum = 0L; - for (Entry<HRegionInfo,Long> entry : filterBySubject(table)) { - sum += entry.getValue(); - if (sum > sizeLimitInBytes) { - // Short-circuit early - return ViolationState.IN_VIOLATION; - } - } - // Observance is defined as the size of the table being less than the limit - return sum <= sizeLimitInBytes ? ViolationState.IN_OBSERVANCE : ViolationState.IN_VIOLATION; - } finally { - rlock.unlock(); - } - } - - @Override - public Iterable<Entry<HRegionInfo,Long>> filterBySubject(TableName table) { - rlock.lock(); - try { - return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<HRegionInfo,Long>>() { - @Override - public boolean apply(Entry<HRegionInfo,Long> input) { - return table.equals(input.getKey().getTable()); - } - }); - } finally { - rlock.unlock(); - } - } - - @Override - public void setCurrentState(TableName table, ViolationState state) { - // Defer the "current state" to the chore - this.chore.setTableQuotaViolation(table, state); - } - - @Override - public void setRegionUsage(Map<HRegionInfo,Long> regionUsage) { - wlock.lock(); - try { - this.regionUsage = Objects.requireNonNull(regionUsage); - } finally { - wlock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java new file mode 100644 index 0000000..548faf8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaSnapshotNotifier.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; + +/** + * A {@link SpaceQuotaSnapshotNotifier} which uses the hbase:quota table. + */ +public class TableSpaceQuotaSnapshotNotifier implements SpaceQuotaSnapshotNotifier { + private static final Log LOG = LogFactory.getLog(TableSpaceQuotaSnapshotNotifier.class); + + private Connection conn; + + @Override + public void transitionTable( + TableName tableName, SpaceQuotaSnapshot snapshot) throws IOException { + final Put p = QuotaTableUtil.createPutSpaceSnapshot(tableName, snapshot); + try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Persisting a space quota snapshot " + snapshot + " for " + tableName); + } + quotaTable.put(p); + } + } + + @Override + public void initialize(Connection conn) { + this.conn = conn; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java deleted file mode 100644 index a8b1c55..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.quotas; - -import java.io.IOException; - -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; - -/** - * A {@link SpaceQuotaViolationNotifier} which uses the hbase:quota table. - */ -public class TableSpaceQuotaViolationNotifier implements SpaceQuotaViolationNotifier { - - private Connection conn; - - @Override - public void transitionTableToViolation( - TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException { - final Put p = QuotaTableUtil.createEnableViolationPolicyUpdate(tableName, violationPolicy); - try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { - quotaTable.put(p); - } - } - - @Override - public void transitionTableToObservance(TableName tableName) throws IOException { - final Delete d = QuotaTableUtil.createRemoveViolationPolicyUpdate(tableName); - try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { - quotaTable.delete(d); - } - } - - @Override - public void initialize(Connection conn) { - this.conn = conn; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java new file mode 100644 index 0000000..2d34d45 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas.policies; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.quotas.SpaceLimitingException; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +/** + * Abstract implementation for {@link SpaceViolationPolicyEnforcement}. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class AbstractViolationPolicyEnforcement + implements SpaceViolationPolicyEnforcement { + + RegionServerServices rss; + TableName tableName; + SpaceQuotaSnapshot quotaSnapshot; + + public void setRegionServerServices(RegionServerServices rss) { + this.rss = Objects.requireNonNull(rss); + } + + public void setTableName(TableName tableName) { + this.tableName = tableName; + } + + public RegionServerServices getRegionServerServices() { + return this.rss; + } + + public TableName getTableName() { + return this.tableName; + } + + public void setQuotaSnapshot(SpaceQuotaSnapshot snapshot) { + this.quotaSnapshot = Objects.requireNonNull(snapshot); + } + + @Override + public SpaceQuotaSnapshot getQuotaSnapshot() { + return this.quotaSnapshot; + } + + @Override + public void initialize(RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot) { + setRegionServerServices(rss); + setTableName(tableName); + setQuotaSnapshot(snapshot); + } + + @Override + public boolean areCompactionsDisabled() { + return false; + } + + @Override + public boolean shouldCheckBulkLoads() { + // Reference check. The singleton is used when no quota exists to check against + return SpaceQuotaSnapshot.getNoSuchSnapshot() != quotaSnapshot; + } + + @Override + public void checkBulkLoad(FileSystem fs, List<String> paths) throws SpaceLimitingException { + // Compute the amount of space that could be used to save some arithmetic in the for-loop + final long sizeAvailableForBulkLoads = quotaSnapshot.getLimit() - quotaSnapshot.getUsage(); + long size = 0L; + for (String path : paths) { + size += addSingleFile(fs, path); + if (size > sizeAvailableForBulkLoads) { + break; + } + } + if (size > sizeAvailableForBulkLoads) { + throw new SpaceLimitingException(getPolicyName(), "Bulk load of " + paths + + " is disallowed because the file(s) exceed the limits of a space quota."); + } + } + + private long addSingleFile(FileSystem fs, String path) throws SpaceLimitingException { + final FileStatus status; + try { + status = fs.getFileStatus(new Path(Objects.requireNonNull(path))); + } catch (IOException e) { + throw new SpaceLimitingException(getPolicyName(), "Could not verify length of file to bulk load", e); + } + if (!status.isFile()) { + throw new IllegalArgumentException(path + " is not a file."); + } + return status.getLen(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java new file mode 100644 index 0000000..e4171ad --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/BulkLoadVerifyingViolationPolicyEnforcement.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas.policies; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.quotas.SpaceLimitingException; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; + +/** + * A {@link SpaceViolationPolicyEnforcement} instance which only checks for bulk loads. Useful for tables + * which have no violation policy. This is the default case for tables, as we want to make sure that + * a single bulk load call would violate the quota. + */ +@InterfaceAudience.Private +public class BulkLoadVerifyingViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement { + + @Override + public void enable() {} + + @Override + public void disable() {} + + @Override + public String getPolicyName() { + return "BulkLoadVerifying"; + } + + @Override + public boolean areCompactionsDisabled() { + return false; + } + + @Override + public void check(Mutation m) throws SpaceLimitingException {} +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java new file mode 100644 index 0000000..0d6d886 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DisableTableViolationPolicyEnforcement.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas.policies; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.quotas.SpaceLimitingException; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; + +/** + * A {@link SpaceViolationPolicyEnforcement} which disables the table. The enforcement + * counterpart to {@link SpaceViolationPolicy#DISABLE}. + */ +@InterfaceAudience.Private +public class DisableTableViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement { + private static final Log LOG = LogFactory.getLog(DisableTableViolationPolicyEnforcement.class); + + @Override + public void enable() throws IOException { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Starting disable of " + getTableName()); + } + getRegionServerServices().getClusterConnection().getAdmin().disableTable(getTableName()); + if (LOG.isTraceEnabled()) { + LOG.trace("Disable is complete for " + getTableName()); + } + } catch (TableNotEnabledException tnee) { + // The state we wanted it to be in. + } + } + + @Override + public void disable() throws IOException { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Starting enable of " + getTableName()); + } + getRegionServerServices().getClusterConnection().getAdmin().enableTable(getTableName()); + if (LOG.isTraceEnabled()) { + LOG.trace("Enable is complete for " + getTableName()); + } + } catch (TableNotDisabledException tnde) { + // The state we wanted it to be in + } + } + + @Override + public void check(Mutation m) throws SpaceLimitingException { + // If this policy is enacted, then the table is (or should be) disabled. + throw new SpaceLimitingException( + getPolicyName(), "This table is disabled due to violating a space quota."); + } + + @Override + public String getPolicyName() { + return SpaceViolationPolicy.DISABLE.name(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java new file mode 100644 index 0000000..a60cb45 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoInsertsViolationPolicyEnforcement.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas.policies; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.quotas.SpaceLimitingException; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; + +/** + * A {@link SpaceViolationPolicyEnforcement} which disallows any inserts to the table. The + * enforcement counterpart to {@link SpaceViolationPolicy#NO_INSERTS}. + */ +@InterfaceAudience.Private +public class NoInsertsViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement { + + @Override + public void enable() {} + + @Override + public void disable() {} + + @Override + public void check(Mutation m) throws SpaceLimitingException { + // Disallow all "new" data flowing into HBase, but allow Deletes (even though we know they will + // temporarily increase utilization). + if (m instanceof Append || m instanceof Increment || m instanceof Put) { + throw new SpaceLimitingException(getPolicyName(), + m.getClass().getSimpleName() + "s are disallowed due to a space quota."); + } + } + + @Override + public String getPolicyName() { + return SpaceViolationPolicy.NO_INSERTS.name(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java new file mode 100644 index 0000000..e7f872c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesCompactionsViolationPolicyEnforcement.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas.policies; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; + +/** + * A {@link SpaceViolationPolicyEnforcement} implementation which disables all updates and + * compactions. The enforcement counterpart to {@link SpaceViolationPolicy#NO_WRITES_COMPACTIONS}. + */ +@InterfaceAudience.Private +public class NoWritesCompactionsViolationPolicyEnforcement + extends NoWritesViolationPolicyEnforcement { + private static final Log LOG = LogFactory.getLog( + NoWritesCompactionsViolationPolicyEnforcement.class); + + private AtomicBoolean disableCompactions = new AtomicBoolean(false); + + @Override + public synchronized void enable() { + boolean ret = disableCompactions.compareAndSet(false, true); + if (!ret && LOG.isTraceEnabled()) { + LOG.trace("Compactions were already disabled upon enabling the policy"); + } + } + + @Override + public synchronized void disable() { + boolean ret = disableCompactions.compareAndSet(true, false); + if (!ret && LOG.isTraceEnabled()) { + LOG.trace("Compactions were already enabled upon disabling the policy"); + } + } + + @Override + public String getPolicyName() { + return SpaceViolationPolicy.NO_WRITES_COMPACTIONS.name(); + } + + @Override + public boolean areCompactionsDisabled() { + return disableCompactions.get(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java new file mode 100644 index 0000000..a04f418 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/NoWritesViolationPolicyEnforcement.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas.policies; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.quotas.SpaceLimitingException; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; + +/** + * A {@link SpaceViolationPolicyEnforcement} implementation which disables all writes flowing + * into HBase. The enforcement counterpart to {@link SpaceViolationPolicy#NO_WRITES}. + */ +@InterfaceAudience.Private +public class NoWritesViolationPolicyEnforcement extends AbstractViolationPolicyEnforcement { + + @Override + public void enable() {} + + @Override + public void disable() {} + + @Override + public void check(Mutation m) throws SpaceLimitingException { + if (m instanceof Append || m instanceof Delete || m instanceof Increment || m instanceof Put) { + throw new SpaceLimitingException(getPolicyName(), + m.getClass().getSimpleName() + "s are disallowed due to a space quota."); + } + } + + @Override + public String getPolicyName() { + return SpaceViolationPolicy.NO_WRITES.name(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index eba984a..9aa0042 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; +import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; @@ -337,6 +338,17 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi if (compaction == null) return null; // message logged inside } + final RegionServerSpaceQuotaManager spaceQuotaManager = + this.server.getRegionServerSpaceQuotaManager(); + if (null != spaceQuotaManager && spaceQuotaManager.areCompactionsDisabled( + r.getTableDesc().getTableName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation " + + " policy disallows compactions."); + } + return null; + } + // We assume that most compactions are small. So, put system compactions into small // pool; we will do selection there, and move to large pool if necessary. ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 4ce43c9..fad21b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -94,8 +94,12 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.MasterRpcServices; +import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; +import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Leases.Lease; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; @@ -193,7 +197,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; -import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DNS; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -559,8 +562,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, ByteArrayComparable comparator, - RegionActionResult.Builder builder) throws IOException { + CompareOp compareOp, ByteArrayComparable comparator, RegionActionResult.Builder builder, + ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } @@ -579,10 +582,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } switch (type) { case PUT: - rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); + Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); + rm.add(put); break; case DELETE: - rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); + Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); + rm.add(del); break; default: throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); @@ -609,10 +616,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @throws IOException */ private Result append(final Region region, final OperationQuota quota, - final MutationProto mutation, final CellScanner cellScanner, long nonceGroup) + final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, + ActivePolicyEnforcement spaceQuota) throws IOException { long before = EnvironmentEdgeManager.currentTime(); Append append = ProtobufUtil.toAppend(mutation, cellScanner); + spaceQuota.getPolicyEnforcement(region).check(append); quota.addMutation(append); Result r = null; if (region.getCoprocessorHost() != null) { @@ -657,10 +666,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @throws IOException */ private Result increment(final Region region, final OperationQuota quota, - final MutationProto mutation, final CellScanner cells, long nonceGroup) + final MutationProto mutation, final CellScanner cells, long nonceGroup, + ActivePolicyEnforcement spaceQuota) throws IOException { long before = EnvironmentEdgeManager.currentTime(); Increment increment = ProtobufUtil.toIncrement(mutation, cells); + spaceQuota.getPolicyEnforcement(region).check(increment); quota.addMutation(increment); Result r = null; if (region.getCoprocessorHost() != null) { @@ -712,7 +723,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private List<CellScannable> doNonAtomicRegionMutation(final Region region, final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, - final RegionScannersCloseCallBack closeCallBack, RpcCallContext context) { + final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, + ActivePolicyEnforcement spaceQuotaEnforcement) { // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do // one at a time, we instead pass them in batch. Be aware that the corresponding // ResultOrException instance that matches each Put or Delete is then added down in the @@ -805,15 +817,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && !mutations.isEmpty()) { // Flush out any Puts or Deletes already collected. - doBatchOp(builder, region, quota, mutations, cellScanner); + doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); mutations.clear(); } switch (type) { case APPEND: - r = append(region, quota, action.getMutation(), cellScanner, nonceGroup); + r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, + spaceQuotaEnforcement); break; case INCREMENT: - r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup); + r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, + spaceQuotaEnforcement); break; case PUT: case DELETE: @@ -864,7 +878,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // Finish up any outstanding mutations if (mutations != null && !mutations.isEmpty()) { - doBatchOp(builder, region, quota, mutations, cellScanner); + doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); } return cellsToReturn; } @@ -878,7 +892,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private void doBatchOp(final RegionActionResult.Builder builder, final Region region, final OperationQuota quota, final List<ClientProtos.Action> mutations, - final CellScanner cells) { + final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; @@ -895,6 +909,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, batchContainsDelete = true; } mArray[i++] = mutation; + // Check if a space quota disallows this mutation + spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); quota.addMutation(mutation); } @@ -1265,10 +1281,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return regionServer.getConfiguration(); } - private RegionServerRpcQuotaManager getQuotaManager() { + private RegionServerRpcQuotaManager getRpcQuotaManager() { return regionServer.getRegionServerRpcQuotaManager(); } + private RegionServerSpaceQuotaManager getSpaceQuotaManager() { + return regionServer.getRegionServerSpaceQuotaManager(); + } + void start() { rpcServer.start(); } @@ -1443,6 +1463,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkOpen(); requestCount.increment(); Region region = getRegion(request.getRegion()); + if (QuotaUtil.isQuotaEnabled(getConfiguration()) && + this.regionServer.getRegionServerSpaceQuotaManager().areCompactionsDisabled( + region.getTableDesc().getTableName())) { + throw new DoNotRetryIOException("Compactions on this region are " + + "disabled due to a space quota violation."); + } region.startRegionOperation(Operation.COMPACT_REGION); LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); boolean major = false; @@ -2129,6 +2155,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean loaded = false; Map<byte[], List<Path>> map = null; + // Check to see if this bulk load would exceed the space quota for this table + if (QuotaUtil.isQuotaEnabled(getConfiguration())) { + ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); + SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(region); + if (null != enforcement) { + // Bulk loads must still be atomic. We must enact all or none. + List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); + for (FamilyPath familyPath : request.getFamilyPathList()) { + filePaths.add(familyPath.getPath()); + } + // Check if the batch of files exceeds the current quota + enforcement.checkBulkLoad(regionServer.getFileSystem(), filePaths); + } + } + if (!request.hasBulkToken()) { // Old style bulk load. This will not be supported in future releases List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount()); @@ -2257,7 +2298,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Boolean existence = null; Result r = null; RpcCallContext context = RpcServer.getCurrentCall(); - quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); + quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); Get clientGet = ProtobufUtil.toGet(get); if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { @@ -2395,6 +2436,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, this.rpcMultiRequestCount.increment(); Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request .getRegionActionCount()); + ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); for (RegionAction regionAction : request.getRegionActionList()) { this.requestCount.add(regionAction.getActionCount()); OperationQuota quota; @@ -2403,7 +2445,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, RegionSpecifier regionSpecifier = regionAction.getRegion(); try { region = getRegion(regionSpecifier); - quota = getQuotaManager().checkQuota(region, regionAction.getActionList()); + quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); } catch (IOException e) { rpcServer.getMetrics().exception(e); regionActionResultBuilder.setException(ResponseConverter.buildException(e)); @@ -2431,7 +2473,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ProtobufUtil.toComparator(condition.getComparator()); processed = checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, qualifier, compareOp, - comparator, regionActionResultBuilder); + comparator, regionActionResultBuilder, spaceQuotaEnforcement); } else { mutateRows(region, regionAction.getActionList(), cellScanner, regionActionResultBuilder); @@ -2452,7 +2494,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, context.setCallBack(closeCallBack); } cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, - regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context); + regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, + spaceQuotaEnforcement); } responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); quota.close(); @@ -2519,6 +2562,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, CellScanner cellScanner = controller != null ? controller.cellScanner() : null; OperationQuota quota = null; RpcCallContext context = RpcServer.getCurrentCall(); + ActivePolicyEnforcement spaceQuotaEnforcement = null; // Clear scanner so we are not holding on to reference across call. if (controller != null) { controller.setCellScanner(null); @@ -2538,19 +2582,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Boolean processed = null; MutationType type = mutation.getMutateType(); - quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); + quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); + spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); switch (type) { case APPEND: // TODO: this doesn't actually check anything. - r = append(region, quota, mutation, cellScanner, nonceGroup); + r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); break; case INCREMENT: // TODO: this doesn't actually check anything. - r = increment(region, quota, mutation, cellScanner, nonceGroup); + r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); break; case PUT: Put put = ProtobufUtil.toPut(mutation, cellScanner); + // Throws an exception when violated + spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); quota.addMutation(put); if (request.hasCondition()) { Condition condition = request.getCondition(); @@ -2580,6 +2627,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, break; case DELETE: Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); quota.addMutation(delete); if (request.hasCondition()) { Condition condition = request.getCondition(); @@ -2998,7 +3046,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } OperationQuota quota; try { - quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); + quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); } catch (IOException e) { addScannerLeaseBack(lease); throw new ServiceException(e); http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java new file mode 100644 index 0000000..888978d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.rules.TestName; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +@InterfaceAudience.Private +public class SpaceQuotaHelperForTests { + private static final Log LOG = LogFactory.getLog(SpaceQuotaHelperForTests.class); + + public static final int SIZE_PER_VALUE = 256; + public static final String F1 = "f1"; + public static final long ONE_KILOBYTE = 1024L; + public static final long ONE_MEGABYTE = ONE_KILOBYTE * ONE_KILOBYTE; + + private final HBaseTestingUtility testUtil; + private final TestName testName; + private final AtomicLong counter; + + public SpaceQuotaHelperForTests( + HBaseTestingUtility testUtil, TestName testName, AtomicLong counter) { + this.testUtil = Objects.requireNonNull(testUtil); + this.testName = Objects.requireNonNull(testName); + this.counter = Objects.requireNonNull(counter); + } + + // + // Helpers + // + + void writeData(TableName tn, long sizeInBytes) throws IOException { + final Connection conn = testUtil.getConnection(); + final Table table = conn.getTable(tn); + try { + List<Put> updates = new ArrayList<>(); + long bytesToWrite = sizeInBytes; + long rowKeyId = 0L; + final StringBuilder sb = new StringBuilder(); + final Random r = new Random(); + while (bytesToWrite > 0L) { + sb.setLength(0); + sb.append(Long.toString(rowKeyId)); + // Use the reverse counter as the rowKey to get even spread across all regions + Put p = new Put(Bytes.toBytes(sb.reverse().toString())); + byte[] value = new byte[SIZE_PER_VALUE]; + r.nextBytes(value); + p.addColumn(Bytes.toBytes(F1), Bytes.toBytes("q1"), value); + updates.add(p); + + // Batch ~13KB worth of updates + if (updates.size() > 50) { + table.put(updates); + updates.clear(); + } + + // Just count the value size, ignore the size of rowkey + column + bytesToWrite -= SIZE_PER_VALUE; + rowKeyId++; + } + + // Write the final batch + if (!updates.isEmpty()) { + table.put(updates); + } + + LOG.debug("Data was written to HBase"); + // Push the data to disk. + testUtil.getAdmin().flush(tn); + LOG.debug("Data flushed to disk"); + } finally { + table.close(); + } + } + + Multimap<TableName, QuotaSettings> createTablesWithSpaceQuotas() throws Exception { + final Admin admin = testUtil.getAdmin(); + final Multimap<TableName, QuotaSettings> tablesWithQuotas = HashMultimap.create(); + + final TableName tn1 = createTable(); + final TableName tn2 = createTable(); + + NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + counter.getAndIncrement()).build(); + admin.createNamespace(nd); + final TableName tn3 = createTableInNamespace(nd); + final TableName tn4 = createTableInNamespace(nd); + final TableName tn5 = createTableInNamespace(nd); + + final long sizeLimit1 = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB + final SpaceViolationPolicy violationPolicy1 = SpaceViolationPolicy.NO_WRITES; + QuotaSettings qs1 = QuotaSettingsFactory.limitTableSpace(tn1, sizeLimit1, violationPolicy1); + tablesWithQuotas.put(tn1, qs1); + admin.setQuota(qs1); + + final long sizeLimit2 = 1024L * 1024L * 1024L * 200L; // 200GB + final SpaceViolationPolicy violationPolicy2 = SpaceViolationPolicy.NO_WRITES_COMPACTIONS; + QuotaSettings qs2 = QuotaSettingsFactory.limitTableSpace(tn2, sizeLimit2, violationPolicy2); + tablesWithQuotas.put(tn2, qs2); + admin.setQuota(qs2); + + final long sizeLimit3 = 1024L * 1024L * 1024L * 1024L * 100L; // 100TB + final SpaceViolationPolicy violationPolicy3 = SpaceViolationPolicy.NO_INSERTS; + QuotaSettings qs3 = QuotaSettingsFactory.limitNamespaceSpace( + nd.getName(), sizeLimit3, violationPolicy3); + tablesWithQuotas.put(tn3, qs3); + tablesWithQuotas.put(tn4, qs3); + tablesWithQuotas.put(tn5, qs3); + admin.setQuota(qs3); + + final long sizeLimit4 = 1024L * 1024L * 1024L * 5L; // 5GB + final SpaceViolationPolicy violationPolicy4 = SpaceViolationPolicy.NO_INSERTS; + QuotaSettings qs4 = QuotaSettingsFactory.limitTableSpace(tn5, sizeLimit4, violationPolicy4); + // Override the ns quota for tn5, import edge-case to catch table quota taking + // precedence over ns quota. + tablesWithQuotas.put(tn5, qs4); + admin.setQuota(qs4); + + return tablesWithQuotas; + } + + TableName createTable() throws Exception { + return createTableWithRegions(1); + } + + TableName createTableWithRegions(int numRegions) throws Exception { + return createTableWithRegions(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR, numRegions); + } + + TableName createTableWithRegions(String namespace, int numRegions) throws Exception { + final Admin admin = testUtil.getAdmin(); + final TableName tn = TableName.valueOf( + namespace, testName.getMethodName() + counter.getAndIncrement()); + + // Delete the old table + if (admin.tableExists(tn)) { + admin.disableTable(tn); + admin.deleteTable(tn); + } + + // Create the table + HTableDescriptor tableDesc = new HTableDescriptor(tn); + tableDesc.addFamily(new HColumnDescriptor(F1)); + if (numRegions == 1) { + admin.createTable(tableDesc); + } else { + admin.createTable(tableDesc, Bytes.toBytes("0"), Bytes.toBytes("9"), numRegions); + } + return tn; + } + + TableName createTableInNamespace(NamespaceDescriptor nd) throws Exception { + final Admin admin = testUtil.getAdmin(); + final TableName tn = TableName.valueOf(nd.getName(), + testName.getMethodName() + counter.getAndIncrement()); + + // Delete the old table + if (admin.tableExists(tn)) { + admin.disableTable(tn); + admin.deleteTable(tn); + } + + // Create the table + HTableDescriptor tableDesc = new HTableDescriptor(tn); + tableDesc.addFamily(new HColumnDescriptor(F1)); + + admin.createTable(tableDesc); + return tn; + } + + void partitionTablesByQuotaTarget(Multimap<TableName,QuotaSettings> quotas, + Set<TableName> tablesWithTableQuota, Set<TableName> tablesWithNamespaceQuota) { + // Partition the tables with quotas by table and ns quota + for (Entry<TableName, QuotaSettings> entry : quotas.entries()) { + SpaceLimitSettings settings = (SpaceLimitSettings) entry.getValue(); + TableName tn = entry.getKey(); + if (null != settings.getTableName()) { + tablesWithTableQuota.add(tn); + } + if (null != settings.getNamespace()) { + tablesWithNamespaceQuota.add(tn); + } + + if (null == settings.getTableName() && null == settings.getNamespace()) { + fail("Unexpected table name with null tableName and namespace: " + tn); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java new file mode 100644 index 0000000..0986e8c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaSnapshotNotifierForTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; + +/** + * A SpaceQuotaSnapshotNotifier implementation for testing. + */ +@InterfaceAudience.Private +public class SpaceQuotaSnapshotNotifierForTest implements SpaceQuotaSnapshotNotifier { + private static final Log LOG = LogFactory.getLog(SpaceQuotaSnapshotNotifierForTest.class); + + private final Map<TableName,SpaceQuotaSnapshot> tableQuotaSnapshots = new HashMap<>(); + + @Override + public void initialize(Connection conn) {} + + @Override + public synchronized void transitionTable(TableName tableName, SpaceQuotaSnapshot snapshot) { + if (LOG.isTraceEnabled()) { + LOG.trace("Persisting " + tableName + "=>" + snapshot); + } + tableQuotaSnapshots.put(tableName, snapshot); + } + + public synchronized Map<TableName,SpaceQuotaSnapshot> copySnapshots() { + return new HashMap<>(this.tableQuotaSnapshots); + } + + public synchronized void clearSnapshots() { + this.tableQuotaSnapshots.clear(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java new file mode 100644 index 0000000..80363e8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestActivePolicyEnforcement.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.quotas.policies.NoWritesViolationPolicyEnforcement; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.quotas.policies.BulkLoadVerifyingViolationPolicyEnforcement; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test class for {@link ActivePolicyEnforcement}. + */ +@Category(SmallTests.class) +public class TestActivePolicyEnforcement { + + @Test + public void testGetter() { + final TableName tableName = TableName.valueOf("table"); + Map<TableName, SpaceViolationPolicyEnforcement> map = new HashMap<>(); + map.put(tableName, new NoWritesViolationPolicyEnforcement()); + ActivePolicyEnforcement ape = new ActivePolicyEnforcement(map, Collections.emptyMap(), null); + assertEquals(map.get(tableName), ape.getPolicyEnforcement(tableName)); + } + + @Test + public void testNoPolicyReturnsNoopEnforcement() { + ActivePolicyEnforcement ape = new ActivePolicyEnforcement( + new HashMap<>(), Collections.emptyMap(), mock(RegionServerServices.class)); + SpaceViolationPolicyEnforcement enforcement = ape.getPolicyEnforcement( + TableName.valueOf("nonexistent")); + assertNotNull(enforcement); + assertTrue( + "Expected an instance of NoopViolationPolicyEnforcement", + enforcement instanceof BulkLoadVerifyingViolationPolicyEnforcement); + } + + @Test + public void testNoBulkLoadChecksOnNoSnapshot() { + ActivePolicyEnforcement ape = new ActivePolicyEnforcement( + new HashMap<TableName, SpaceViolationPolicyEnforcement>(), + Collections.<TableName,SpaceQuotaSnapshot> emptyMap(), + mock(RegionServerServices.class)); + SpaceViolationPolicyEnforcement enforcement = ape.getPolicyEnforcement( + TableName.valueOf("nonexistent")); + assertFalse("Should not check bulkloads", enforcement.shouldCheckBulkLoads()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java index ad98720..18e47af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java @@ -144,6 +144,7 @@ public class TestFileSystemUtilizationChore { assertEquals(timeUnit, chore.getTimeUnit()); } + @SuppressWarnings("unchecked") @Test public void testProcessingLeftoverRegions() { final Configuration conf = getDefaultHBaseConfiguration(); @@ -176,6 +177,7 @@ public class TestFileSystemUtilizationChore { chore.chore(); } + @SuppressWarnings("unchecked") @Test public void testProcessingNowOfflineLeftoversAreIgnored() { final Configuration conf = getDefaultHBaseConfiguration(); @@ -185,7 +187,6 @@ public class TestFileSystemUtilizationChore { final List<Long> leftover1Sizes = Arrays.asList(1024L, 4096L); final long leftover1Sum = sum(leftover1Sizes); final List<Long> leftover2Sizes = Arrays.asList(2048L); - final long leftover2Sum = sum(leftover2Sizes); final Region lr1 = mockRegionWithSize(leftover1Sizes); final Region lr2 = mockRegionWithSize(leftover2Sizes); http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java index 8182513..4a7258f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; @@ -45,7 +44,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; /** - * Test class for {@link NamespaceQuotaViolationStore}. + * Test class for {@link NamespaceQuotaSnapshotStore}. */ @Category(SmallTests.class) public class TestNamespaceQuotaViolationStore { @@ -54,19 +53,19 @@ public class TestNamespaceQuotaViolationStore { private Connection conn; private QuotaObserverChore chore; private Map<HRegionInfo, Long> regionReports; - private NamespaceQuotaViolationStore store; + private NamespaceQuotaSnapshotStore store; @Before public void setup() { conn = mock(Connection.class); chore = mock(QuotaObserverChore.class); regionReports = new HashMap<>(); - store = new NamespaceQuotaViolationStore(conn, chore, regionReports); + store = new NamespaceQuotaSnapshotStore(conn, chore, regionReports); } @Test public void testGetSpaceQuota() throws Exception { - NamespaceQuotaViolationStore mockStore = mock(NamespaceQuotaViolationStore.class); + NamespaceQuotaSnapshotStore mockStore = mock(NamespaceQuotaSnapshotStore.class); when(mockStore.getSpaceQuota(any(String.class))).thenCallRealMethod(); Quotas quotaWithSpace = Quotas.newBuilder().setSpace( @@ -113,17 +112,18 @@ public class TestNamespaceQuotaViolationStore { regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(1), Bytes.toBytes(2)), 1024L * 256L); // Below the quota - assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, quota)); + assertEquals(false, store.getTargetState(NS, quota).getQuotaStatus().isInViolation()); regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(2), Bytes.toBytes(3)), 1024L * 256L); // Equal to the quota is still in observance - assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, quota)); + assertEquals(false, store.getTargetState(NS, quota).getQuotaStatus().isInViolation()); regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(3), Bytes.toBytes(4)), 1024L); // Exceeds the quota, should be in violation - assertEquals(ViolationState.IN_VIOLATION, store.getTargetState(NS, quota)); + assertEquals(true, store.getTargetState(NS, quota).getQuotaStatus().isInViolation()); + assertEquals(SpaceViolationPolicy.DISABLE, store.getTargetState(NS, quota).getQuotaStatus().getPolicy()); } @Test http://git-wip-us.apache.org/repos/asf/hbase/blob/b99e9cf9/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java index db549e4..da294c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java @@ -17,8 +17,6 @@ package org.apache.hadoop.hbase.quotas; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -28,8 +26,6 @@ import java.util.Map; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; @@ -50,8 +46,6 @@ public class TestQuotaObserverChore { public void setup() throws Exception { conn = mock(Connection.class); chore = mock(QuotaObserverChore.class); - // Set up some rules to call the real method on the mock. - when(chore.getViolationPolicy(any(SpaceQuota.class))).thenCallRealMethod(); } @Test @@ -76,31 +70,11 @@ public class TestQuotaObserverChore { regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i + 1)), 0L); } - TableQuotaViolationStore store = new TableQuotaViolationStore(conn, chore, regionReports); - when(chore.getTableViolationStore()).thenReturn(store); + TableQuotaSnapshotStore store = new TableQuotaSnapshotStore(conn, chore, regionReports); + when(chore.getTableSnapshotStore()).thenReturn(store); assertEquals(numTable1Regions, Iterables.size(store.filterBySubject(tn1))); assertEquals(numTable2Regions, Iterables.size(store.filterBySubject(tn2))); assertEquals(numTable3Regions, Iterables.size(store.filterBySubject(tn3))); } - - @Test - public void testExtractViolationPolicy() { - for (SpaceViolationPolicy policy : SpaceViolationPolicy.values()) { - SpaceQuota spaceQuota = SpaceQuota.newBuilder() - .setSoftLimit(1024L) - .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy)) - .build(); - assertEquals(policy, chore.getViolationPolicy(spaceQuota)); - } - SpaceQuota malformedQuota = SpaceQuota.newBuilder() - .setSoftLimit(1024L) - .build(); - try { - chore.getViolationPolicy(malformedQuota); - fail("Should have thrown an IllegalArgumentException."); - } catch (IllegalArgumentException e) { - // Pass - } - } } \ No newline at end of file