http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java index c493b25..943c898 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java @@ -22,16 +22,12 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -40,20 +36,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceNotFoundException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -62,7 +53,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; @@ -72,11 +62,8 @@ import com.google.common.collect.Multimap; @Category(LargeTests.class) public class TestQuotaObserverChoreWithMiniCluster { private static final Log LOG = LogFactory.getLog(TestQuotaObserverChoreWithMiniCluster.class); - private static final int SIZE_PER_VALUE = 256; - private static final String F1 = "f1"; private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final AtomicLong COUNTER = new AtomicLong(0); - private static final long ONE_MEGABYTE = 1024L * 1024L; private static final long DEFAULT_WAIT_MILLIS = 500; @Rule @@ -84,18 +71,19 @@ public class TestQuotaObserverChoreWithMiniCluster { private HMaster master; private QuotaObserverChore chore; - private SpaceQuotaViolationNotifierForTest violationNotifier; + private SpaceQuotaSnapshotNotifierForTest snapshotNotifier; + private SpaceQuotaHelperForTests helper; @BeforeClass public static void setUp() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); - conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_DELAY_KEY, 1000); - conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_PERIOD_KEY, 1000); + conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000); + conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000); conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); - conf.setClass(SpaceQuotaViolationNotifierFactory.VIOLATION_NOTIFIER_KEY, - SpaceQuotaViolationNotifierForTest.class, SpaceQuotaViolationNotifier.class); + conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY, + SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class); TEST_UTIL.startMiniCluster(1); } @@ -131,40 +119,55 @@ public class TestQuotaObserverChoreWithMiniCluster { } master = TEST_UTIL.getMiniHBaseCluster().getMaster(); - violationNotifier = - (SpaceQuotaViolationNotifierForTest) master.getSpaceQuotaViolationNotifier(); - violationNotifier.clearTableViolations(); + snapshotNotifier = + (SpaceQuotaSnapshotNotifierForTest) master.getSpaceQuotaSnapshotNotifier(); + snapshotNotifier.clearSnapshots(); chore = master.getQuotaObserverChore(); + helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); } @Test public void testTableViolatesQuota() throws Exception { - TableName tn = createTableWithRegions(10); + TableName tn = helper.createTableWithRegions(10); - final long sizeLimit = 2L * ONE_MEGABYTE; + final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS; QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy); TEST_UTIL.getAdmin().setQuota(settings); // Write more data than should be allowed - writeData(tn, 3L * ONE_MEGABYTE); - - Map<TableName,SpaceViolationPolicy> violatedQuotas = violationNotifier.snapshotTablesInViolation(); - while (violatedQuotas.isEmpty()) { - LOG.info("Found no violated quotas, sleeping and retrying. Current reports: " - + master.getMasterQuotaManager().snapshotRegionSizes()); - try { - Thread.sleep(DEFAULT_WAIT_MILLIS); - } catch (InterruptedException e) { - LOG.debug("Interrupted while sleeping.", e); - Thread.currentThread().interrupt(); + helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); + + Map<TableName,SpaceQuotaSnapshot> quotaSnapshots = snapshotNotifier.copySnapshots(); + boolean foundSnapshot = false; + while (!foundSnapshot) { + if (quotaSnapshots.isEmpty()) { + LOG.info("Found no violated quotas, sleeping and retrying. Current reports: " + + master.getMasterQuotaManager().snapshotRegionSizes()); + sleepWithInterrupt(DEFAULT_WAIT_MILLIS); + quotaSnapshots = snapshotNotifier.copySnapshots(); + } else { + Entry<TableName,SpaceQuotaSnapshot> entry = Iterables.getOnlyElement(quotaSnapshots.entrySet()); + assertEquals(tn, entry.getKey()); + final SpaceQuotaSnapshot snapshot = entry.getValue(); + if (!snapshot.getQuotaStatus().isInViolation()) { + LOG.info("Found a snapshot, but it was not yet in violation. " + snapshot); + sleepWithInterrupt(DEFAULT_WAIT_MILLIS); + quotaSnapshots = snapshotNotifier.copySnapshots(); + } else { + foundSnapshot = true; + } } - violatedQuotas = violationNotifier.snapshotTablesInViolation(); } - Entry<TableName,SpaceViolationPolicy> entry = Iterables.getOnlyElement(violatedQuotas.entrySet()); + Entry<TableName,SpaceQuotaSnapshot> entry = Iterables.getOnlyElement(quotaSnapshots.entrySet()); assertEquals(tn, entry.getKey()); - assertEquals(violationPolicy, entry.getValue()); + final SpaceQuotaSnapshot snapshot = entry.getValue(); + assertEquals("Snapshot was " + snapshot, violationPolicy, snapshot.getQuotaStatus().getPolicy()); + assertEquals(sizeLimit, snapshot.getLimit()); + assertTrue( + "The usage should be greater than the limit, but were " + snapshot.getUsage() + " and " + + snapshot.getLimit() + ", respectively", snapshot.getUsage() > snapshot.getLimit()); } @Test @@ -179,18 +182,18 @@ public class TestQuotaObserverChoreWithMiniCluster { admin.createNamespace(desc); } - TableName tn1 = createTableWithRegions(namespace, 5); - TableName tn2 = createTableWithRegions(namespace, 5); - TableName tn3 = createTableWithRegions(namespace, 5); + TableName tn1 = helper.createTableWithRegions(namespace, 5); + TableName tn2 = helper.createTableWithRegions(namespace, 5); + TableName tn3 = helper.createTableWithRegions(namespace, 5); - final long sizeLimit = 5L * ONE_MEGABYTE; + final long sizeLimit = 5L * SpaceQuotaHelperForTests.ONE_MEGABYTE; final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE; QuotaSettings settings = QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy); admin.setQuota(settings); - writeData(tn1, 2L * ONE_MEGABYTE); + helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); admin.flush(tn1); - Map<TableName,SpaceViolationPolicy> violatedQuotas = violationNotifier.snapshotTablesInViolation(); + Map<TableName,SpaceQuotaSnapshot> violatedQuotas = snapshotNotifier.copySnapshots(); for (int i = 0; i < 5; i++) { // Check a few times to make sure we don't prematurely move to violation assertEquals("Should not see any quota violations after writing 2MB of data", 0, violatedQuotas.size()); @@ -199,12 +202,12 @@ public class TestQuotaObserverChoreWithMiniCluster { } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping." , e); } - violatedQuotas = violationNotifier.snapshotTablesInViolation(); + violatedQuotas = snapshotNotifier.copySnapshots(); } - writeData(tn2, 2L * ONE_MEGABYTE); + helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); admin.flush(tn2); - violatedQuotas = violationNotifier.snapshotTablesInViolation(); + violatedQuotas = snapshotNotifier.copySnapshots(); for (int i = 0; i < 5; i++) { // Check a few times to make sure we don't prematurely move to violation assertEquals("Should not see any quota violations after writing 4MB of data", 0, @@ -214,14 +217,14 @@ public class TestQuotaObserverChoreWithMiniCluster { } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping." , e); } - violatedQuotas = violationNotifier.snapshotTablesInViolation(); + violatedQuotas = snapshotNotifier.copySnapshots(); } // Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total) // and should push all three tables in the namespace into violation. - writeData(tn3, 2L * ONE_MEGABYTE); + helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); admin.flush(tn3); - violatedQuotas = violationNotifier.snapshotTablesInViolation(); + violatedQuotas = snapshotNotifier.copySnapshots(); while (violatedQuotas.size() < 3) { LOG.debug("Saw fewer violations than desired (expected 3): " + violatedQuotas + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); @@ -231,18 +234,18 @@ public class TestQuotaObserverChoreWithMiniCluster { LOG.debug("Interrupted while sleeping.", e); Thread.currentThread().interrupt(); } - violatedQuotas = violationNotifier.snapshotTablesInViolation(); + violatedQuotas = snapshotNotifier.copySnapshots(); } - SpaceViolationPolicy vp1 = violatedQuotas.remove(tn1); - assertNotNull("tn1 should be in violation", vp1); - assertEquals(violationPolicy, vp1); - SpaceViolationPolicy vp2 = violatedQuotas.remove(tn2); - assertNotNull("tn2 should be in violation", vp2); - assertEquals(violationPolicy, vp2); - SpaceViolationPolicy vp3 = violatedQuotas.remove(tn3); - assertNotNull("tn3 should be in violation", vp3); - assertEquals(violationPolicy, vp3); + SpaceQuotaSnapshot snapshot1 = violatedQuotas.remove(tn1); + assertNotNull("tn1 should be in violation", snapshot1); + assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy()); + SpaceQuotaSnapshot snapshot2 = violatedQuotas.remove(tn2); + assertNotNull("tn2 should be in violation", snapshot2); + assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy()); + SpaceQuotaSnapshot snapshot3 = violatedQuotas.remove(tn3); + assertNotNull("tn3 should be in violation", snapshot3); + assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy()); assertTrue("Unexpected additional quota violations: " + violatedQuotas, violatedQuotas.isEmpty()); } @@ -258,18 +261,18 @@ public class TestQuotaObserverChoreWithMiniCluster { admin.createNamespace(desc); } - TableName tn1 = createTableWithRegions(namespace, 5); - TableName tn2 = createTableWithRegions(namespace, 5); + TableName tn1 = helper.createTableWithRegions(namespace, 5); + TableName tn2 = helper.createTableWithRegions(namespace, 5); - final long namespaceSizeLimit = 3L * ONE_MEGABYTE; + final long namespaceSizeLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE; final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE; QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace, namespaceSizeLimit, namespaceViolationPolicy); admin.setQuota(namespaceSettings); - writeData(tn1, 2L * ONE_MEGABYTE); + helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); admin.flush(tn1); - Map<TableName,SpaceViolationPolicy> violatedQuotas = violationNotifier.snapshotTablesInViolation(); + Map<TableName,SpaceQuotaSnapshot> violatedQuotas = snapshotNotifier.copySnapshots(); for (int i = 0; i < 5; i++) { // Check a few times to make sure we don't prematurely move to violation assertEquals("Should not see any quota violations after writing 2MB of data", 0, @@ -279,12 +282,12 @@ public class TestQuotaObserverChoreWithMiniCluster { } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping." , e); } - violatedQuotas = violationNotifier.snapshotTablesInViolation(); + violatedQuotas = snapshotNotifier.copySnapshots(); } - writeData(tn2, 2L * ONE_MEGABYTE); + helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); admin.flush(tn2); - violatedQuotas = violationNotifier.snapshotTablesInViolation(); + violatedQuotas = snapshotNotifier.copySnapshots(); while (violatedQuotas.size() < 2) { LOG.debug("Saw fewer violations than desired (expected 2): " + violatedQuotas + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); @@ -294,18 +297,18 @@ public class TestQuotaObserverChoreWithMiniCluster { LOG.debug("Interrupted while sleeping.", e); Thread.currentThread().interrupt(); } - violatedQuotas = violationNotifier.snapshotTablesInViolation(); + violatedQuotas = snapshotNotifier.copySnapshots(); } - SpaceViolationPolicy actualPolicyTN1 = violatedQuotas.get(tn1); + SpaceQuotaSnapshot actualPolicyTN1 = violatedQuotas.get(tn1); assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1); - assertEquals(namespaceViolationPolicy, actualPolicyTN1); - SpaceViolationPolicy actualPolicyTN2 = violatedQuotas.get(tn2); + assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy()); + SpaceQuotaSnapshot actualPolicyTN2 = violatedQuotas.get(tn2); assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2); - assertEquals(namespaceViolationPolicy, actualPolicyTN2); + assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy()); // Override the namespace quota with a table quota - final long tableSizeLimit = ONE_MEGABYTE; + final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE; final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS; QuotaSettings tableSettings = QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit, tableViolationPolicy); @@ -313,10 +316,10 @@ public class TestQuotaObserverChoreWithMiniCluster { // Keep checking for the table quota policy to override the namespace quota while (true) { - violatedQuotas = violationNotifier.snapshotTablesInViolation(); - SpaceViolationPolicy actualTableViolationPolicy = violatedQuotas.get(tn1); - assertNotNull("Violation policy should never be null", actualTableViolationPolicy); - if (tableViolationPolicy != actualTableViolationPolicy) { + violatedQuotas = snapshotNotifier.copySnapshots(); + SpaceQuotaSnapshot actualTableSnapshot = violatedQuotas.get(tn1); + assertNotNull("Violation policy should never be null", actualTableSnapshot); + if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy()) { LOG.debug("Saw unexpected table violation policy, waiting and re-checking."); try { Thread.sleep(DEFAULT_WAIT_MILLIS); @@ -326,23 +329,23 @@ public class TestQuotaObserverChoreWithMiniCluster { } continue; } - assertEquals(tableViolationPolicy, actualTableViolationPolicy); + assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy()); break; } // This should not change with the introduction of the table quota for tn1 actualPolicyTN2 = violatedQuotas.get(tn2); assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2); - assertEquals(namespaceViolationPolicy, actualPolicyTN2); + assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy()); } @Test public void testGetAllTablesWithQuotas() throws Exception { - final Multimap<TableName, QuotaSettings> quotas = createTablesWithSpaceQuotas(); + final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas(); Set<TableName> tablesWithQuotas = new HashSet<>(); Set<TableName> namespaceTablesWithQuotas = new HashSet<>(); // Partition the tables with quotas by table and ns quota - partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); + helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined(); assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables()); @@ -351,13 +354,13 @@ public class TestQuotaObserverChoreWithMiniCluster { @Test public void testRpcQuotaTablesAreFiltered() throws Exception { - final Multimap<TableName, QuotaSettings> quotas = createTablesWithSpaceQuotas(); + final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas(); Set<TableName> tablesWithQuotas = new HashSet<>(); Set<TableName> namespaceTablesWithQuotas = new HashSet<>(); // Partition the tables with quotas by table and ns quota - partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); + helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); - TableName rpcQuotaTable = createTable(); + TableName rpcQuotaTable = helper.createTable(); TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory .throttleTable(rpcQuotaTable, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES)); @@ -375,7 +378,7 @@ public class TestQuotaObserverChoreWithMiniCluster { TablesWithQuotas tables = new TablesWithQuotas(TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration()) { @Override - int getNumReportedRegions(TableName table, QuotaViolationStore<TableName> tableStore) { + int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) { Integer i = mockReportedRegions.get(table); if (null == i) { return 0; @@ -385,9 +388,9 @@ public class TestQuotaObserverChoreWithMiniCluster { }; // Create the tables - TableName tn1 = createTableWithRegions(20); - TableName tn2 = createTableWithRegions(20); - TableName tn3 = createTableWithRegions(20); + TableName tn1 = helper.createTableWithRegions(20); + TableName tn2 = helper.createTableWithRegions(20); + TableName tn3 = helper.createTableWithRegions(20); // Add them to the Tables with Quotas object tables.addTableQuotaTable(tn1); @@ -407,9 +410,9 @@ public class TestQuotaObserverChoreWithMiniCluster { @Test public void testFetchSpaceQuota() throws Exception { - Multimap<TableName,QuotaSettings> tables = createTablesWithSpaceQuotas(); + Multimap<TableName,QuotaSettings> tables = helper.createTablesWithSpaceQuotas(); // Can pass in an empty map, we're not consulting it. - chore.initializeViolationStores(Collections.emptyMap()); + chore.initializeSnapshotStores(Collections.emptyMap()); // All tables that were created should have a quota defined. for (Entry<TableName,QuotaSettings> entry : tables.entries()) { final TableName table = entry.getKey(); @@ -420,10 +423,10 @@ public class TestQuotaObserverChoreWithMiniCluster { SpaceQuota spaceQuota = null; if (null != qs.getTableName()) { - spaceQuota = chore.getTableViolationStore().getSpaceQuota(table); + spaceQuota = chore.getTableSnapshotStore().getSpaceQuota(table); assertNotNull("Could not find table space quota for " + table, spaceQuota); } else if (null != qs.getNamespace()) { - spaceQuota = chore.getNamespaceViolationStore().getSpaceQuota(table.getNamespaceAsString()); + spaceQuota = chore.getNamespaceSnapshotStore().getSpaceQuota(table.getNamespaceAsString()); assertNotNull("Could not find namespace space quota for " + table.getNamespaceAsString(), spaceQuota); } else { fail("Expected table or namespace space quota"); @@ -433,166 +436,16 @@ public class TestQuotaObserverChoreWithMiniCluster { assertEquals(sls.getProto().getQuota(), spaceQuota); } - TableName tableWithoutQuota = createTable(); - assertNull(chore.getTableViolationStore().getSpaceQuota(tableWithoutQuota)); + TableName tableWithoutQuota = helper.createTable(); + assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota)); } - // - // Helpers - // - - void writeData(TableName tn, long sizeInBytes) throws IOException { - final Connection conn = TEST_UTIL.getConnection(); - final Table table = conn.getTable(tn); + private void sleepWithInterrupt(long millis) { try { - List<Put> updates = new ArrayList<>(); - long bytesToWrite = sizeInBytes; - long rowKeyId = 0L; - final StringBuilder sb = new StringBuilder(); - final Random r = new Random(); - while (bytesToWrite > 0L) { - sb.setLength(0); - sb.append(Long.toString(rowKeyId)); - // Use the reverse counter as the rowKey to get even spread across all regions - Put p = new Put(Bytes.toBytes(sb.reverse().toString())); - byte[] value = new byte[SIZE_PER_VALUE]; - r.nextBytes(value); - p.addColumn(Bytes.toBytes(F1), Bytes.toBytes("q1"), value); - updates.add(p); - - // Batch 50K worth of updates - if (updates.size() > 50) { - table.put(updates); - updates.clear(); - } - - // Just count the value size, ignore the size of rowkey + column - bytesToWrite -= SIZE_PER_VALUE; - rowKeyId++; - } - - // Write the final batch - if (!updates.isEmpty()) { - table.put(updates); - } - - LOG.debug("Data was written to HBase"); - // Push the data to disk. - TEST_UTIL.getAdmin().flush(tn); - LOG.debug("Data flushed to disk"); - } finally { - table.close(); - } - } - - Multimap<TableName, QuotaSettings> createTablesWithSpaceQuotas() throws Exception { - final Admin admin = TEST_UTIL.getAdmin(); - final Multimap<TableName, QuotaSettings> tablesWithQuotas = HashMultimap.create(); - - final TableName tn1 = createTable(); - final TableName tn2 = createTable(); - - NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + COUNTER.getAndIncrement()).build(); - admin.createNamespace(nd); - final TableName tn3 = createTableInNamespace(nd); - final TableName tn4 = createTableInNamespace(nd); - final TableName tn5 = createTableInNamespace(nd); - - final long sizeLimit1 = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB - final SpaceViolationPolicy violationPolicy1 = SpaceViolationPolicy.NO_WRITES; - QuotaSettings qs1 = QuotaSettingsFactory.limitTableSpace(tn1, sizeLimit1, violationPolicy1); - tablesWithQuotas.put(tn1, qs1); - admin.setQuota(qs1); - - final long sizeLimit2 = 1024L * 1024L * 1024L * 200L; // 200GB - final SpaceViolationPolicy violationPolicy2 = SpaceViolationPolicy.NO_WRITES_COMPACTIONS; - QuotaSettings qs2 = QuotaSettingsFactory.limitTableSpace(tn2, sizeLimit2, violationPolicy2); - tablesWithQuotas.put(tn2, qs2); - admin.setQuota(qs2); - - final long sizeLimit3 = 1024L * 1024L * 1024L * 1024L * 100L; // 100TB - final SpaceViolationPolicy violationPolicy3 = SpaceViolationPolicy.NO_INSERTS; - QuotaSettings qs3 = QuotaSettingsFactory.limitNamespaceSpace(nd.getName(), sizeLimit3, violationPolicy3); - tablesWithQuotas.put(tn3, qs3); - tablesWithQuotas.put(tn4, qs3); - tablesWithQuotas.put(tn5, qs3); - admin.setQuota(qs3); - - final long sizeLimit4 = 1024L * 1024L * 1024L * 5L; // 5GB - final SpaceViolationPolicy violationPolicy4 = SpaceViolationPolicy.NO_INSERTS; - QuotaSettings qs4 = QuotaSettingsFactory.limitTableSpace(tn5, sizeLimit4, violationPolicy4); - // Override the ns quota for tn5, import edge-case to catch table quota taking - // precedence over ns quota. - tablesWithQuotas.put(tn5, qs4); - admin.setQuota(qs4); - - return tablesWithQuotas; - } - - TableName createTable() throws Exception { - return createTableWithRegions(1); - } - - TableName createTableWithRegions(int numRegions) throws Exception { - return createTableWithRegions(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR, numRegions); - } - - TableName createTableWithRegions(String namespace, int numRegions) throws Exception { - final Admin admin = TEST_UTIL.getAdmin(); - final TableName tn = TableName.valueOf(namespace, testName.getMethodName() + COUNTER.getAndIncrement()); - - // Delete the old table - if (admin.tableExists(tn)) { - admin.disableTable(tn); - admin.deleteTable(tn); - } - - // Create the table - HTableDescriptor tableDesc = new HTableDescriptor(tn); - tableDesc.addFamily(new HColumnDescriptor(F1)); - if (numRegions == 1) { - admin.createTable(tableDesc); - } else { - admin.createTable(tableDesc, Bytes.toBytes("0"), Bytes.toBytes("9"), numRegions); - } - return tn; - } - - TableName createTableInNamespace(NamespaceDescriptor nd) throws Exception { - final Admin admin = TEST_UTIL.getAdmin(); - final TableName tn = TableName.valueOf(nd.getName(), - testName.getMethodName() + COUNTER.getAndIncrement()); - - // Delete the old table - if (admin.tableExists(tn)) { - admin.disableTable(tn); - admin.deleteTable(tn); - } - - // Create the table - HTableDescriptor tableDesc = new HTableDescriptor(tn); - tableDesc.addFamily(new HColumnDescriptor(F1)); - - admin.createTable(tableDesc); - return tn; - } - - void partitionTablesByQuotaTarget(Multimap<TableName,QuotaSettings> quotas, - Set<TableName> tablesWithTableQuota, Set<TableName> tablesWithNamespaceQuota) { - // Partition the tables with quotas by table and ns quota - for (Entry<TableName, QuotaSettings> entry : quotas.entries()) { - SpaceLimitSettings settings = (SpaceLimitSettings) entry.getValue(); - TableName tn = entry.getKey(); - if (null != settings.getTableName()) { - tablesWithTableQuota.add(tn); - } - if (null != settings.getNamespace()) { - tablesWithNamespaceQuota.add(tn); - } - - if (null == settings.getTableName() && null == settings.getNamespace()) { - fail("Unexpected table name with null tableName and namespace: " + tn); - } + Thread.sleep(millis); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping"); + Thread.currentThread().interrupt(); } } }
http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java index 55f671a..f10cdef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle; @@ -201,26 +202,29 @@ public class TestQuotaTableUtil { @Test public void testSerDeViolationPolicies() throws Exception { final TableName tn1 = getUniqueTableName(); - final SpaceViolationPolicy policy1 = SpaceViolationPolicy.DISABLE; + final SpaceQuotaSnapshot snapshot1 = new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 512L, 1024L); final TableName tn2 = getUniqueTableName(); - final SpaceViolationPolicy policy2 = SpaceViolationPolicy.NO_INSERTS; + final SpaceQuotaSnapshot snapshot2 = new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 512L, 1024L); final TableName tn3 = getUniqueTableName(); - final SpaceViolationPolicy policy3 = SpaceViolationPolicy.NO_WRITES; + final SpaceQuotaSnapshot snapshot3 = new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 512L, 1024L); List<Put> puts = new ArrayList<>(); - puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn1, policy1)); - puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn2, policy2)); - puts.add(QuotaTableUtil.createEnableViolationPolicyUpdate(tn3, policy3)); - final Map<TableName,SpaceViolationPolicy> expectedPolicies = new HashMap<>(); - expectedPolicies.put(tn1, policy1); - expectedPolicies.put(tn2, policy2); - expectedPolicies.put(tn3, policy3); - - final Map<TableName,SpaceViolationPolicy> actualPolicies = new HashMap<>(); + puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn1, snapshot1)); + puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn2, snapshot2)); + puts.add(QuotaTableUtil.createPutSpaceSnapshot(tn3, snapshot3)); + final Map<TableName,SpaceQuotaSnapshot> expectedPolicies = new HashMap<>(); + expectedPolicies.put(tn1, snapshot1); + expectedPolicies.put(tn2, snapshot2); + expectedPolicies.put(tn3, snapshot3); + + final Map<TableName,SpaceQuotaSnapshot> actualPolicies = new HashMap<>(); try (Table quotaTable = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { quotaTable.put(puts); - ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan()); + ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan()); for (Result r : scanner) { - QuotaTableUtil.extractViolationPolicy(r, actualPolicies); + QuotaTableUtil.extractQuotaSnapshot(r, actualPolicies); } scanner.close(); } @@ -231,4 +235,4 @@ public class TestQuotaTableUtil { private TableName getUniqueTableName() { return TableName.valueOf(testName.getMethodName() + "_" + tableNameCounter++); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java index e5ab317..38656e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java @@ -16,33 +16,29 @@ */ package org.apache.hadoop.hbase.quotas; -import static org.apache.hadoop.hbase.util.Bytes.toBytes; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.HashMap; import java.util.Map; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; +import org.apache.hadoop.hbase.quotas.policies.DisableTableViolationPolicyEnforcement; +import org.apache.hadoop.hbase.quotas.policies.NoInsertsViolationPolicyEnforcement; +import org.apache.hadoop.hbase.quotas.policies.NoWritesCompactionsViolationPolicyEnforcement; +import org.apache.hadoop.hbase.quotas.policies.NoWritesViolationPolicyEnforcement; +import org.apache.hadoop.hbase.quotas.policies.BulkLoadVerifyingViolationPolicyEnforcement; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; /** * Test class for {@link RegionServerSpaceQuotaManager}. @@ -51,77 +47,105 @@ import org.mockito.stubbing.Answer; public class TestRegionServerSpaceQuotaManager { private RegionServerSpaceQuotaManager quotaManager; - private Connection conn; - private Table quotaTable; - private ResultScanner scanner; + private RegionServerServices rss; @Before - @SuppressWarnings("unchecked") public void setup() throws Exception { quotaManager = mock(RegionServerSpaceQuotaManager.class); - conn = mock(Connection.class); - quotaTable = mock(Table.class); - scanner = mock(ResultScanner.class); - // Call the real getViolationPoliciesToEnforce() - when(quotaManager.getViolationPoliciesToEnforce()).thenCallRealMethod(); - // Mock out creating a scanner - when(quotaManager.getConnection()).thenReturn(conn); - when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable); - when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner); - // Mock out the static method call with some indirection - doAnswer(new Answer<Void>(){ - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Result result = invocation.getArgumentAt(0, Result.class); - Map<TableName,SpaceViolationPolicy> policies = invocation.getArgumentAt(1, Map.class); - QuotaTableUtil.extractViolationPolicy(result, policies); - return null; - } - }).when(quotaManager).extractViolationPolicy(any(Result.class), any(Map.class)); + rss = mock(RegionServerServices.class); } @Test - public void testMissingAllColumns() { - List<Result> results = new ArrayList<>(); - results.add(Result.create(Collections.emptyList())); - when(scanner.iterator()).thenReturn(results.iterator()); - try { - quotaManager.getViolationPoliciesToEnforce(); - fail("Expected an IOException, but did not receive one."); - } catch (IOException e) { - // Expected an error because we had no cells in the row. - // This should only happen due to programmer error. - } + public void testSpacePoliciesFromEnforcements() { + final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = new HashMap<>(); + final Map<TableName, SpaceQuotaSnapshot> expectedPolicies = new HashMap<>(); + when(quotaManager.copyActiveEnforcements()).thenReturn(enforcements); + when(quotaManager.getActivePoliciesAsMap()).thenCallRealMethod(); + + NoInsertsViolationPolicyEnforcement noInsertsPolicy = new NoInsertsViolationPolicyEnforcement(); + SpaceQuotaSnapshot noInsertsSnapshot = new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 256L, 1024L); + noInsertsPolicy.initialize(rss, TableName.valueOf("no_inserts"), noInsertsSnapshot); + enforcements.put(noInsertsPolicy.getTableName(), noInsertsPolicy); + expectedPolicies.put(noInsertsPolicy.getTableName(), noInsertsSnapshot); + + NoWritesViolationPolicyEnforcement noWritesPolicy = new NoWritesViolationPolicyEnforcement(); + SpaceQuotaSnapshot noWritesSnapshot = new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 512L, 2048L); + noWritesPolicy.initialize(rss, TableName.valueOf("no_writes"), noWritesSnapshot); + enforcements.put(noWritesPolicy.getTableName(), noWritesPolicy); + expectedPolicies.put(noWritesPolicy.getTableName(), noWritesSnapshot); + + NoWritesCompactionsViolationPolicyEnforcement noWritesCompactionsPolicy = + new NoWritesCompactionsViolationPolicyEnforcement(); + SpaceQuotaSnapshot noWritesCompactionsSnapshot = new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 1024L, 4096L); + noWritesCompactionsPolicy.initialize( + rss, TableName.valueOf("no_writes_compactions"), noWritesCompactionsSnapshot); + enforcements.put(noWritesCompactionsPolicy.getTableName(), noWritesCompactionsPolicy); + expectedPolicies.put(noWritesCompactionsPolicy.getTableName(), + noWritesCompactionsSnapshot); + + DisableTableViolationPolicyEnforcement disablePolicy = + new DisableTableViolationPolicyEnforcement(); + SpaceQuotaSnapshot disableSnapshot = new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 2048L, 8192L); + disablePolicy.initialize(rss, TableName.valueOf("disable"), disableSnapshot); + enforcements.put(disablePolicy.getTableName(), disablePolicy); + expectedPolicies.put(disablePolicy.getTableName(), disableSnapshot); + + enforcements.put( + TableName.valueOf("no_policy"), new BulkLoadVerifyingViolationPolicyEnforcement()); + + Map<TableName, SpaceQuotaSnapshot> actualPolicies = quotaManager.getActivePoliciesAsMap(); + assertEquals(expectedPolicies, actualPolicies); } @Test - public void testMissingDesiredColumn() { - List<Result> results = new ArrayList<>(); - // Give a column that isn't the one we want - Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("q"), toBytes("s"), new byte[0]); - results.add(Result.create(Collections.singletonList(c))); - when(scanner.iterator()).thenReturn(results.iterator()); - try { - quotaManager.getViolationPoliciesToEnforce(); - fail("Expected an IOException, but did not receive one."); - } catch (IOException e) { - // Expected an error because we were missing the column we expected in this row. - // This should only happen due to programmer error. - } + public void testExceptionOnPolicyEnforcementEnable() throws Exception { + final TableName tableName = TableName.valueOf("foo"); + final SpaceQuotaSnapshot snapshot = new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 2048L); + RegionServerServices rss = mock(RegionServerServices.class); + SpaceViolationPolicyEnforcementFactory factory = mock( + SpaceViolationPolicyEnforcementFactory.class); + SpaceViolationPolicyEnforcement enforcement = mock(SpaceViolationPolicyEnforcement.class); + RegionServerSpaceQuotaManager realManager = new RegionServerSpaceQuotaManager(rss, factory); + + when(factory.create(rss, tableName, snapshot)).thenReturn(enforcement); + doThrow(new IOException("Failed for test!")).when(enforcement).enable(); + + realManager.enforceViolationPolicy(tableName, snapshot); + Map<TableName, SpaceViolationPolicyEnforcement> enforcements = + realManager.copyActiveEnforcements(); + assertTrue("Expected active enforcements to be empty, but were " + enforcements, + enforcements.isEmpty()); } @Test - public void testParsingError() { - List<Result> results = new ArrayList<>(); - Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("u"), toBytes("v"), new byte[0]); - results.add(Result.create(Collections.singletonList(c))); - when(scanner.iterator()).thenReturn(results.iterator()); - try { - quotaManager.getViolationPoliciesToEnforce(); - fail("Expected an IOException, but did not receive one."); - } catch (IOException e) { - // We provided a garbage serialized protobuf message (empty byte array), this should - // in turn throw an IOException - } + public void testExceptionOnPolicyEnforcementDisable() throws Exception { + final TableName tableName = TableName.valueOf("foo"); + final SpaceQuotaSnapshot snapshot = new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 2048L); + RegionServerServices rss = mock(RegionServerServices.class); + SpaceViolationPolicyEnforcementFactory factory = mock( + SpaceViolationPolicyEnforcementFactory.class); + SpaceViolationPolicyEnforcement enforcement = mock(SpaceViolationPolicyEnforcement.class); + RegionServerSpaceQuotaManager realManager = new RegionServerSpaceQuotaManager(rss, factory); + + when(factory.create(rss, tableName, snapshot)).thenReturn(enforcement); + doNothing().when(enforcement).enable(); + doThrow(new IOException("Failed for test!")).when(enforcement).disable(); + + // Enabling should work + realManager.enforceViolationPolicy(tableName, snapshot); + Map<TableName, SpaceViolationPolicyEnforcement> enforcements = + realManager.copyActiveEnforcements(); + assertEquals(1, enforcements.size()); + + // If the disable fails, we should still treat it as "active" + realManager.disableViolationPolicyEnforcement(tableName); + enforcements = realManager.copyActiveEnforcements(); + assertEquals(1, enforcements.size()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java index 160de46..7f0f9ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java @@ -16,20 +16,34 @@ */ package org.apache.hadoop.hbase.quotas; +import static org.apache.hadoop.hbase.util.Bytes.toBytes; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Before; @@ -37,42 +51,62 @@ import org.junit.Test; import org.junit.experimental.categories.Category; /** - * Test class for {@link SpaceQuotaViolationPolicyRefresherChore}. + * Test class for {@link SpaceQuotaRefresherChore}. */ @Category(SmallTests.class) public class TestSpaceQuotaViolationPolicyRefresherChore { private RegionServerSpaceQuotaManager manager; private RegionServerServices rss; - private SpaceQuotaViolationPolicyRefresherChore chore; + private SpaceQuotaRefresherChore chore; private Configuration conf; + private Connection conn; + @SuppressWarnings("unchecked") @Before - public void setup() { + public void setup() throws IOException { conf = HBaseConfiguration.create(); rss = mock(RegionServerServices.class); manager = mock(RegionServerSpaceQuotaManager.class); + conn = mock(Connection.class); when(manager.getRegionServerServices()).thenReturn(rss); when(rss.getConfiguration()).thenReturn(conf); - chore = new SpaceQuotaViolationPolicyRefresherChore(manager); + + + chore = mock(SpaceQuotaRefresherChore.class); + when(chore.getConnection()).thenReturn(conn); + when(chore.getManager()).thenReturn(manager); + doCallRealMethod().when(chore).chore(); + when(chore.isInViolation(any(SpaceQuotaSnapshot.class))).thenCallRealMethod(); + doCallRealMethod().when(chore).extractQuotaSnapshot(any(Result.class), any(Map.class)); } @Test public void testPoliciesAreEnforced() throws IOException { - final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>(); - policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE); - policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS); - policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES); - policiesToEnforce.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES_COMPACTIONS); + // Create a number of policies that should be enforced (usage > limit) + final Map<TableName,SpaceQuotaSnapshot> policiesToEnforce = new HashMap<>(); + policiesToEnforce.put( + TableName.valueOf("table1"), + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 512L)); + policiesToEnforce.put( + TableName.valueOf("table2"), + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 2048L, 512L)); + policiesToEnforce.put( + TableName.valueOf("table3"), + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 4096L, 512L)); + policiesToEnforce.put( + TableName.valueOf("table4"), + new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 8192L, 512L)); // No active enforcements - when(manager.getActiveViolationPolicyEnforcements()).thenReturn(Collections.emptyMap()); + when(manager.copyQuotaSnapshots()).thenReturn(Collections.emptyMap()); // Policies to enforce - when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce); + when(chore.fetchSnapshotsFromQuotaTable()).thenReturn(policiesToEnforce); chore.chore(); - for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) { + for (Entry<TableName,SpaceQuotaSnapshot> entry : policiesToEnforce.entrySet()) { // Ensure we enforce the policy verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue()); // Don't disable any policies @@ -82,50 +116,135 @@ public class TestSpaceQuotaViolationPolicyRefresherChore { @Test public void testOldPoliciesAreRemoved() throws IOException { - final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>(); - policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE); - policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS); - - final Map<TableName,SpaceViolationPolicy> previousPolicies = new HashMap<>(); - previousPolicies.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES); - previousPolicies.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES); + final Map<TableName,SpaceQuotaSnapshot> previousPolicies = new HashMap<>(); + previousPolicies.put( + TableName.valueOf("table3"), + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 4096L, 512L)); + previousPolicies.put( + TableName.valueOf("table4"), + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 8192L, 512L)); + + final Map<TableName,SpaceQuotaSnapshot> policiesToEnforce = new HashMap<>(); + policiesToEnforce.put( + TableName.valueOf("table1"), + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 512L)); + policiesToEnforce.put( + TableName.valueOf("table2"), + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 2048L, 512L)); + policiesToEnforce.put( + TableName.valueOf("table3"), + new SpaceQuotaSnapshot(SpaceQuotaStatus.notInViolation(), 256L, 512L)); + policiesToEnforce.put( + TableName.valueOf("table4"), + new SpaceQuotaSnapshot(SpaceQuotaStatus.notInViolation(), 128L, 512L)); // No active enforcements - when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies); + when(manager.copyQuotaSnapshots()).thenReturn(previousPolicies); // Policies to enforce - when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce); + when(chore.fetchSnapshotsFromQuotaTable()).thenReturn(policiesToEnforce); chore.chore(); - for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) { - verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue()); - } + verify(manager).enforceViolationPolicy( + TableName.valueOf("table1"), policiesToEnforce.get(TableName.valueOf("table1"))); + verify(manager).enforceViolationPolicy( + TableName.valueOf("table2"), policiesToEnforce.get(TableName.valueOf("table2"))); - for (Entry<TableName,SpaceViolationPolicy> entry : previousPolicies.entrySet()) { - verify(manager).disableViolationPolicyEnforcement(entry.getKey()); - } + verify(manager).disableViolationPolicyEnforcement(TableName.valueOf("table3")); + verify(manager).disableViolationPolicyEnforcement(TableName.valueOf("table4")); } @Test public void testNewPolicyOverridesOld() throws IOException { - final Map<TableName,SpaceViolationPolicy> policiesToEnforce = new HashMap<>(); - policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE); - policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_WRITES); - policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_INSERTS); - - final Map<TableName,SpaceViolationPolicy> previousPolicies = new HashMap<>(); - previousPolicies.put(TableName.valueOf("table1"), SpaceViolationPolicy.NO_WRITES); + final Map<TableName,SpaceQuotaSnapshot> policiesToEnforce = new HashMap<>(); + policiesToEnforce.put( + TableName.valueOf("table1"), + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 512L)); + policiesToEnforce.put( + TableName.valueOf("table2"), + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 2048L, 512L)); + policiesToEnforce.put( + TableName.valueOf("table3"), + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 4096L, 512L)); + + final Map<TableName,SpaceQuotaSnapshot> previousPolicies = new HashMap<>(); + previousPolicies.put( + TableName.valueOf("table1"), + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES), 8192L, 512L)); // No active enforcements - when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies); + when(manager.getActivePoliciesAsMap()).thenReturn(previousPolicies); // Policies to enforce - when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce); + when(chore.fetchSnapshotsFromQuotaTable()).thenReturn(policiesToEnforce); chore.chore(); - for (Entry<TableName,SpaceViolationPolicy> entry : policiesToEnforce.entrySet()) { + for (Entry<TableName,SpaceQuotaSnapshot> entry : policiesToEnforce.entrySet()) { verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue()); } verify(manager, never()).disableViolationPolicyEnforcement(TableName.valueOf("table1")); } + + @Test + public void testMissingAllColumns() throws IOException { + when(chore.fetchSnapshotsFromQuotaTable()).thenCallRealMethod(); + ResultScanner scanner = mock(ResultScanner.class); + Table quotaTable = mock(Table.class); + when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable); + when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner); + + List<Result> results = new ArrayList<>(); + results.add(Result.create(Collections.emptyList())); + when(scanner.iterator()).thenReturn(results.iterator()); + try { + chore.fetchSnapshotsFromQuotaTable(); + fail("Expected an IOException, but did not receive one."); + } catch (IOException e) { + // Expected an error because we had no cells in the row. + // This should only happen due to programmer error. + } + } + + @Test + public void testMissingDesiredColumn() throws IOException { + when(chore.fetchSnapshotsFromQuotaTable()).thenCallRealMethod(); + ResultScanner scanner = mock(ResultScanner.class); + Table quotaTable = mock(Table.class); + when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable); + when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner); + + List<Result> results = new ArrayList<>(); + // Give a column that isn't the one we want + Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("q"), toBytes("s"), new byte[0]); + results.add(Result.create(Collections.singletonList(c))); + when(scanner.iterator()).thenReturn(results.iterator()); + try { + chore.fetchSnapshotsFromQuotaTable(); + fail("Expected an IOException, but did not receive one."); + } catch (IOException e) { + // Expected an error because we were missing the column we expected in this row. + // This should only happen due to programmer error. + } + } + + @Test + public void testParsingError() throws IOException { + when(chore.fetchSnapshotsFromQuotaTable()).thenCallRealMethod(); + ResultScanner scanner = mock(ResultScanner.class); + Table quotaTable = mock(Table.class); + when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable); + when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner); + + List<Result> results = new ArrayList<>(); + Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("u"), toBytes("v"), new byte[0]); + results.add(Result.create(Collections.singletonList(c))); + when(scanner.iterator()).thenReturn(results.iterator()); + try { + chore.fetchSnapshotsFromQuotaTable(); + fail("Expected an IOException, but did not receive one."); + } catch (IOException e) { + // We provided a garbage serialized protobuf message (empty byte array), this should + // in turn throw an IOException + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java new file mode 100644 index 0000000..ea7dcf1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.ClientServiceCallable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SecureBulkLoadClient; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.quotas.policies.BulkLoadVerifyingViolationPolicyEnforcement; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; +import org.apache.hadoop.hbase.security.AccessDeniedException; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.StringUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * End-to-end test class for filesystem space quotas. + */ +@Category(LargeTests.class) +public class TestSpaceQuotas { + private static final Log LOG = LogFactory.getLog(TestSpaceQuotas.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final AtomicLong COUNTER = new AtomicLong(0); + private static final int NUM_RETRIES = 10; + + @Rule + public TestName testName = new TestName(); + private SpaceQuotaHelperForTests helper; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + // Increase the frequency of some of the chores for responsiveness of the test + conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); + conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); + conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000); + conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000); + conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000); + conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000); + conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void removeAllQuotas() throws Exception { + final Connection conn = TEST_UTIL.getConnection(); + // Wait for the quota table to be created + if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { + do { + LOG.debug("Quota table does not yet exist"); + Thread.sleep(1000); + } while (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)); + } else { + // Or, clean up any quotas from previous test runs. + QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration()); + for (QuotaSettings quotaSettings : scanner) { + final String namespace = quotaSettings.getNamespace(); + final TableName tableName = quotaSettings.getTableName(); + if (null != namespace) { + LOG.debug("Deleting quota for namespace: " + namespace); + QuotaUtil.deleteNamespaceQuota(conn, namespace); + } else { + assert null != tableName; + LOG.debug("Deleting quota for table: "+ tableName); + QuotaUtil.deleteTableQuota(conn, tableName); + } + } + } + helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); + } + + @Test + public void testNoInsertsWithPut() throws Exception { + Put p = new Put(Bytes.toBytes("to_reject")); + p.addColumn( + Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); + writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, p); + } + + @Test + public void testNoInsertsWithAppend() throws Exception { + Append a = new Append(Bytes.toBytes("to_reject")); + a.add( + Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); + writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, a); + } + + @Test + public void testNoInsertsWithIncrement() throws Exception { + Increment i = new Increment(Bytes.toBytes("to_reject")); + i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0); + writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, i); + } + + @Test + public void testDeletesAfterNoInserts() throws Exception { + final TableName tn = writeUntilViolation(SpaceViolationPolicy.NO_INSERTS); + // Try a couple of times to verify that the quota never gets enforced, same as we + // do when we're trying to catch the failure. + Delete d = new Delete(Bytes.toBytes("should_not_be_rejected")); + for (int i = 0; i < NUM_RETRIES; i++) { + try (Table t = TEST_UTIL.getConnection().getTable(tn)) { + t.delete(d); + } + } + } + + @Test + public void testNoWritesWithPut() throws Exception { + Put p = new Put(Bytes.toBytes("to_reject")); + p.addColumn( + Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); + writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p); + } + + @Test + public void testNoWritesWithAppend() throws Exception { + Append a = new Append(Bytes.toBytes("to_reject")); + a.add( + Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); + writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, a); + } + + @Test + public void testNoWritesWithIncrement() throws Exception { + Increment i = new Increment(Bytes.toBytes("to_reject")); + i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0); + writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, i); + } + + @Test + public void testNoWritesWithDelete() throws Exception { + Delete d = new Delete(Bytes.toBytes("to_reject")); + writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, d); + } + + @Test + public void testNoCompactions() throws Exception { + Put p = new Put(Bytes.toBytes("to_reject")); + p.addColumn( + Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); + final TableName tn = writeUntilViolationAndVerifyViolation( + SpaceViolationPolicy.NO_WRITES_COMPACTIONS, p); + // We know the policy is active at this point + + // Major compactions should be rejected + try { + TEST_UTIL.getAdmin().majorCompact(tn); + fail("Expected that invoking the compaction should throw an Exception"); + } catch (DoNotRetryIOException e) { + // Expected! + } + // Minor compactions should also be rejected. + try { + TEST_UTIL.getAdmin().compact(tn); + fail("Expected that invoking the compaction should throw an Exception"); + } catch (DoNotRetryIOException e) { + // Expected! + } + } + + @Test + public void testNoEnableAfterDisablePolicy() throws Exception { + Put p = new Put(Bytes.toBytes("to_reject")); + p.addColumn( + Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); + final TableName tn = writeUntilViolation(SpaceViolationPolicy.DISABLE); + final Admin admin = TEST_UTIL.getAdmin(); + // Disabling a table relies on some external action (over the other policies), so wait a bit + // more than the other tests. + for (int i = 0; i < NUM_RETRIES * 2; i++) { + if (admin.isTableEnabled(tn)) { + LOG.info(tn + " is still enabled, expecting it to be disabled. Will wait and re-check."); + Thread.sleep(2000); + } + } + assertFalse(tn + " is still enabled but it should be disabled", admin.isTableEnabled(tn)); + try { + admin.enableTable(tn); + } catch (AccessDeniedException e) { + String exceptionContents = StringUtils.stringifyException(e); + final String expectedText = "violated space quota"; + assertTrue("Expected the exception to contain " + expectedText + ", but was: " + + exceptionContents, exceptionContents.contains(expectedText)); + } + } + + @Test(timeout=120000) + public void testNoBulkLoadsWithNoWrites() throws Exception { + Put p = new Put(Bytes.toBytes("to_reject")); + p.addColumn( + Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject")); + TableName tableName = writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p); + + // The table is now in violation. Try to do a bulk load + ClientServiceCallable<Void> callable = generateFileToLoad(tableName, 1, 50); + RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); + RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); + try { + caller.callWithRetries(callable, Integer.MAX_VALUE); + fail("Expected the bulk load call to fail!"); + } catch (SpaceLimitingException e) { + // Pass + LOG.trace("Caught expected exception", e); + } + } + + @Test(timeout=120000) + public void testAtomicBulkLoadUnderQuota() throws Exception { + // Need to verify that if the batch of hfiles cannot be loaded, none are loaded. + TableName tn = helper.createTableWithRegions(10); + + final long sizeLimit = 50L * SpaceQuotaHelperForTests.ONE_KILOBYTE; + QuotaSettings settings = QuotaSettingsFactory.limitTableSpace( + tn, sizeLimit, SpaceViolationPolicy.NO_INSERTS); + TEST_UTIL.getAdmin().setQuota(settings); + + HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + RegionServerSpaceQuotaManager spaceQuotaManager = rs.getRegionServerSpaceQuotaManager(); + Map<TableName,SpaceQuotaSnapshot> snapshots = spaceQuotaManager.copyQuotaSnapshots(); + Map<HRegionInfo,Long> regionSizes = getReportedSizesForTable(tn); + while (true) { + SpaceQuotaSnapshot snapshot = snapshots.get(tn); + if (null != snapshot && snapshot.getLimit() > 0) { + break; + } + LOG.debug( + "Snapshot does not yet realize quota limit: " + snapshots + ", regionsizes: " + + regionSizes); + Thread.sleep(3000); + snapshots = spaceQuotaManager.copyQuotaSnapshots(); + regionSizes = getReportedSizesForTable(tn); + } + // Our quota limit should be reflected in the latest snapshot + SpaceQuotaSnapshot snapshot = snapshots.get(tn); + assertEquals(0L, snapshot.getUsage()); + assertEquals(sizeLimit, snapshot.getLimit()); + + // We would also not have a "real" policy in violation + ActivePolicyEnforcement activePolicies = spaceQuotaManager.getActiveEnforcements(); + SpaceViolationPolicyEnforcement enforcement = activePolicies.getPolicyEnforcement(tn); + assertTrue( + "Expected to find Noop policy, but got " + enforcement.getClass().getSimpleName(), + enforcement instanceof BulkLoadVerifyingViolationPolicyEnforcement); + + // Should generate two files, each of which is over 25KB each + ClientServiceCallable<Void> callable = generateFileToLoad(tn, 2, 500); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + FileStatus[] files = fs.listStatus( + new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files")); + for (FileStatus file : files) { + assertTrue( + "Expected the file, " + file.getPath() + ", length to be larger than 25KB, but was " + + file.getLen(), + file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE); + LOG.debug(file.getPath() + " -> " + file.getLen() +"B"); + } + + RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); + RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); + try { + caller.callWithRetries(callable, Integer.MAX_VALUE); + fail("Expected the bulk load call to fail!"); + } catch (SpaceLimitingException e) { + // Pass + LOG.trace("Caught expected exception", e); + } + // Verify that we have no data in the table because neither file should have been + // loaded even though one of the files could have. + Table table = TEST_UTIL.getConnection().getTable(tn); + ResultScanner scanner = table.getScanner(new Scan()); + try { + assertNull("Expected no results", scanner.next()); + } finally{ + scanner.close(); + } + } + + private Map<HRegionInfo,Long> getReportedSizesForTable(TableName tn) { + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + MasterQuotaManager quotaManager = master.getMasterQuotaManager(); + Map<HRegionInfo,Long> filteredRegionSizes = new HashMap<>(); + for (Entry<HRegionInfo,Long> entry : quotaManager.snapshotRegionSizes().entrySet()) { + if (entry.getKey().getTable().equals(tn)) { + filteredRegionSizes.put(entry.getKey(), entry.getValue()); + } + } + return filteredRegionSizes; + } + + private TableName writeUntilViolation(SpaceViolationPolicy policyToViolate) throws Exception { + TableName tn = helper.createTableWithRegions(10); + + final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; + QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, policyToViolate); + TEST_UTIL.getAdmin().setQuota(settings); + + // Write more data than should be allowed and flush it to disk + helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); + + // This should be sufficient time for the chores to run and see the change. + Thread.sleep(5000); + + return tn; + } + + private TableName writeUntilViolationAndVerifyViolation( + SpaceViolationPolicy policyToViolate, Mutation m) throws Exception { + final TableName tn = writeUntilViolation(policyToViolate); + + // But let's try a few times to get the exception before failing + boolean sawError = false; + for (int i = 0; i < NUM_RETRIES && !sawError; i++) { + try (Table table = TEST_UTIL.getConnection().getTable(tn)) { + if (m instanceof Put) { + table.put((Put) m); + } else if (m instanceof Delete) { + table.delete((Delete) m); + } else if (m instanceof Append) { + table.append((Append) m); + } else if (m instanceof Increment) { + table.increment((Increment) m); + } else { + fail( + "Failed to apply " + m.getClass().getSimpleName() + + " to the table. Programming error"); + } + LOG.info("Did not reject the " + m.getClass().getSimpleName() + ", will sleep and retry"); + Thread.sleep(2000); + } catch (Exception e) { + String msg = StringUtils.stringifyException(e); + assertTrue("Expected exception message to contain the word '" + policyToViolate.name() + + "', but was " + msg, msg.contains(policyToViolate.name())); + sawError = true; + } + } + if (!sawError) { + try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME)) { + ResultScanner scanner = quotaTable.getScanner(new Scan()); + Result result = null; + LOG.info("Dumping contents of hbase:quota table"); + while ((result = scanner.next()) != null) { + LOG.info(Bytes.toString(result.getRow()) + " => " + result.toString()); + } + scanner.close(); + } + } + assertTrue( + "Expected to see an exception writing data to a table exceeding its quota", sawError); + + return tn; + } + + private ClientServiceCallable<Void> generateFileToLoad( + TableName tn, int numFiles, int numRowsPerFile) throws Exception { + Connection conn = TEST_UTIL.getConnection(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration conf = TEST_UTIL.getConfiguration(); + Path baseDir = new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"); + fs.mkdirs(baseDir); + final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(); + for (int i = 1; i <= numFiles; i++) { + Path hfile = new Path(baseDir, "file" + i); + TestHRegionServerBulkLoad.createHFile( + fs, hfile, Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), + Bytes.toBytes("reject"), numRowsPerFile); + famPaths.add(new Pair<>(Bytes.toBytes(SpaceQuotaHelperForTests.F1), hfile.toString())); + } + + // bulk load HFiles + Table table = conn.getTable(tn); + final String bulkToken = new SecureBulkLoadClient(conf, table).prepareBulkLoad(conn); + return new ClientServiceCallable<Void>(conn, + tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController()) { + @Override + public Void rpcCall() throws Exception { + SecureBulkLoadClient secureClient = null; + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + try (Table table = conn.getTable(getTableName())) { + secureClient = new SecureBulkLoadClient(conf, table); + secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, + true, null, bulkToken); + } + return null; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java index efc046b..cefed67 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; @@ -44,7 +44,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; /** - * Test class for {@link TableQuotaViolationStore}. + * Test class for {@link TableQuotaSnapshotStore}. */ @Category(SmallTests.class) public class TestTableQuotaViolationStore { @@ -53,14 +53,14 @@ public class TestTableQuotaViolationStore { private Connection conn; private QuotaObserverChore chore; private Map<HRegionInfo, Long> regionReports; - private TableQuotaViolationStore store; + private TableQuotaSnapshotStore store; @Before public void setup() { conn = mock(Connection.class); chore = mock(QuotaObserverChore.class); regionReports = new HashMap<>(); - store = new TableQuotaViolationStore(conn, chore, regionReports); + store = new TableQuotaSnapshotStore(conn, chore, regionReports); } @Test @@ -108,23 +108,29 @@ public class TestTableQuotaViolationStore { regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(0), Bytes.toBytes(1)), 1024L * 512L); regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(1), Bytes.toBytes(2)), 1024L * 256L); + SpaceQuotaSnapshot tn1Snapshot = new SpaceQuotaSnapshot( + SpaceQuotaStatus.notInViolation(), 1024L * 768L, 1024L * 1024L); + // Below the quota - assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(tn1, quota)); + assertEquals(tn1Snapshot, store.getTargetState(tn1, quota)); regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(2), Bytes.toBytes(3)), 1024L * 256L); + tn1Snapshot = new SpaceQuotaSnapshot(SpaceQuotaStatus.notInViolation(), 1024L * 1024L, 1024L * 1024L); // Equal to the quota is still in observance - assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(tn1, quota)); + assertEquals(tn1Snapshot, store.getTargetState(tn1, quota)); regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(3), Bytes.toBytes(4)), 1024L); + tn1Snapshot = new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L * 1024L + 1024L, 1024L * 1024L); // Exceeds the quota, should be in violation - assertEquals(ViolationState.IN_VIOLATION, store.getTargetState(tn1, quota)); + assertEquals(tn1Snapshot, store.getTargetState(tn1, quota)); } @Test public void testGetSpaceQuota() throws Exception { - TableQuotaViolationStore mockStore = mock(TableQuotaViolationStore.class); + TableQuotaSnapshotStore mockStore = mock(TableQuotaSnapshotStore.class); when(mockStore.getSpaceQuota(any(TableName.class))).thenCallRealMethod(); Quotas quotaWithSpace = Quotas.newBuilder().setSpace( http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java index 4a7000c..d190c8c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java @@ -30,12 +30,11 @@ import java.util.Objects; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; @@ -44,17 +43,17 @@ import org.junit.experimental.categories.Category; import org.mockito.ArgumentMatcher; /** - * Test case for {@link TableSpaceQuotaViolationNotifier}. + * Test case for {@link TableSpaceQuotaSnapshotNotifier}. */ @Category(SmallTests.class) public class TestTableSpaceQuotaViolationNotifier { - private TableSpaceQuotaViolationNotifier notifier; + private TableSpaceQuotaSnapshotNotifier notifier; private Connection conn; @Before public void setup() throws Exception { - notifier = new TableSpaceQuotaViolationNotifier(); + notifier = new TableSpaceQuotaSnapshotNotifier(); conn = mock(Connection.class); notifier.initialize(conn); } @@ -62,35 +61,25 @@ public class TestTableSpaceQuotaViolationNotifier { @Test public void testToViolation() throws Exception { final TableName tn = TableName.valueOf("inviolation"); - final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS; + final SpaceQuotaSnapshot snapshot = new SpaceQuotaSnapshot( + new SpaceQuotaStatus(SpaceViolationPolicy.NO_INSERTS), 1024L, 512L); final Table quotaTable = mock(Table.class); when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable); final Put expectedPut = new Put(Bytes.toBytes("t." + tn.getNameAsString())); - final SpaceQuota protoQuota = SpaceQuota.newBuilder() - .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy)) + final QuotaProtos.SpaceQuotaSnapshot protoQuota = QuotaProtos.SpaceQuotaSnapshot.newBuilder() + .setStatus(QuotaProtos.SpaceQuotaStatus.newBuilder().setInViolation(true).setPolicy( + org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy.NO_INSERTS)) + .setLimit(512L) + .setUsage(1024L) .build(); - expectedPut.addColumn(Bytes.toBytes("u"), Bytes.toBytes("v"), protoQuota.toByteArray()); + expectedPut.addColumn(Bytes.toBytes("u"), Bytes.toBytes("p"), protoQuota.toByteArray()); - notifier.transitionTableToViolation(tn, policy); + notifier.transitionTable(tn, snapshot); verify(quotaTable).put(argThat(new SingleCellPutMatcher(expectedPut))); } - @Test - public void testToObservance() throws Exception { - final TableName tn = TableName.valueOf("notinviolation"); - final Table quotaTable = mock(Table.class); - when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable); - - final Delete expectedDelete = new Delete(Bytes.toBytes("t." + tn.getNameAsString())); - expectedDelete.addColumn(Bytes.toBytes("u"), Bytes.toBytes("v")); - - notifier.transitionTableToObservance(tn); - - verify(quotaTable).delete(argThat(new SingleCellDeleteMatcher(expectedDelete))); - } - /** * Parameterized for Puts. */ @@ -101,15 +90,6 @@ public class TestTableSpaceQuotaViolationNotifier { } /** - * Parameterized for Deletes. - */ - private static class SingleCellDeleteMatcher extends SingleCellMutationMatcher<Delete> { - private SingleCellDeleteMatcher(Delete expected) { - super(expected); - } - } - - /** * Quick hack to verify a Mutation with one column. */ private static class SingleCellMutationMatcher<T> extends ArgumentMatcher<T> { http://git-wip-us.apache.org/repos/asf/hbase/blob/0d76d667/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java index bb8d5cd..3d0aec6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java @@ -96,7 +96,7 @@ public class TestTablesWithQuotas { final Map<TableName,Integer> reportedRegions = new HashMap<>(); final Map<TableName,Integer> actualRegions = new HashMap<>(); final Configuration conf = HBaseConfiguration.create(); - conf.setDouble(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY, 0.95); + conf.setDouble(QuotaObserverChore.QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY, 0.95); TableName tooFewRegionsTable = TableName.valueOf("tn1"); TableName sufficientRegionsTable = TableName.valueOf("tn2"); @@ -114,7 +114,7 @@ public class TestTablesWithQuotas { } @Override - int getNumReportedRegions(TableName table, QuotaViolationStore<TableName> tableStore) { + int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) { return reportedRegions.get(table); } }; @@ -177,13 +177,13 @@ public class TestTablesWithQuotas { QuotaObserverChore chore = mock(QuotaObserverChore.class); Map<HRegionInfo,Long> regionUsage = new HashMap<>(); - TableQuotaViolationStore store = new TableQuotaViolationStore(conn, chore, regionUsage); + TableQuotaSnapshotStore store = new TableQuotaSnapshotStore(conn, chore, regionUsage); // A super dirty hack to verify that, after getting no regions for our table, // we bail out and start processing the next element (which there is none). final TablesWithQuotas tables = new TablesWithQuotas(conn, conf) { @Override - int getNumReportedRegions(TableName table, QuotaViolationStore<TableName> tableStore) { + int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) { throw new RuntimeException("Should should not reach here"); } };