Repository: hbase Updated Branches: refs/heads/branch-2 8f0f820f2 -> 9c29a6211
HBASE-19311 Promote TestAcidGuarantees to LargeTests and start mini cluster once to make it faster Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9c29a621 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9c29a621 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9c29a621 Branch: refs/heads/branch-2 Commit: 9c29a6211b7272a91aa520d8ac4343e148a6a0ca Parents: 8f0f820 Author: zhangduo <zhang...@apache.org> Authored: Tue Nov 21 21:18:54 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Nov 22 14:10:17 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/util/AbstractHBaseTool.java | 10 +- .../hbase/IntegrationTestAcidGuarantees.java | 60 ++- .../hadoop/hbase/AcidGuaranteesTestTool.java | 415 ++++++++++++++++ .../apache/hadoop/hbase/TestAcidGuarantees.java | 498 +++---------------- 4 files changed, 510 insertions(+), 473 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9c29a621/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java index b808d3e..e301c1f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this * work for additional information regarding copyright ownership. The ASF @@ -231,6 +231,14 @@ public abstract class AbstractHBaseTool implements Tool, Configurable { } } + public long getOptionAsLong(CommandLine cmd, String opt, int defaultValue) { + if (cmd.hasOption(opt)) { + return Long.parseLong(cmd.getOptionValue(opt)); + } else { + return defaultValue; + } + } + public double getOptionAsDouble(CommandLine cmd, String opt, double defaultValue) { if (cmd.hasOption(opt)) { return Double.parseDouble(cmd.getOptionValue(opt)); http://git-wip-us.apache.org/repos/asf/hbase/blob/9c29a621/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java index e1c17a4..3c1e6ad 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java @@ -17,26 +17,30 @@ */ package org.apache.hadoop.hbase; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; +import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_A; +import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_B; +import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_C; +import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.TABLE_NAME; + +import java.util.Set; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.regionserver.CompactingMemStore; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; -import org.apache.hadoop.hbase.regionserver.MemStoreCompactor; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.util.Set; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; /** * This Integration Test verifies acid guarantees across column families by frequently writing * values to rows with multiple column families and concurrently reading entire rows that expect all * column families. - * * <p> * Sample usage: + * * <pre> * hbase org.apache.hadoop.hbase.IntegrationTestAcidGuarantees -Dmillis=10000 -DnumWriters=50 * -DnumGetters=2 -DnumScanners=2 -DnumUniqueRows=5 @@ -47,19 +51,11 @@ public class IntegrationTestAcidGuarantees extends IntegrationTestBase { private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster // The unit test version. - TestAcidGuarantees tag; + AcidGuaranteesTestTool tool; @Override public int runTestFromCommandLine() throws Exception { - Configuration c = getConf(); - int millis = c.getInt("millis", 5000); - int numWriters = c.getInt("numWriters", 50); - int numGetters = c.getInt("numGetters", 2); - int numScanners = c.getInt("numScanners", 2); - int numUniqueRows = c.getInt("numUniqueRows", 3); - boolean useMob = c.getBoolean("useMob",false); - tag.runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true, useMob); - return 0; + return tool.run(new String[0]); } @Override @@ -68,50 +64,50 @@ public class IntegrationTestAcidGuarantees extends IntegrationTestBase { util = getTestingUtil(getConf()); util.initializeCluster(SERVER_COUNT); conf = getConf(); - conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024)); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128 * 1024)); // prevent aggressive region split conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, - ConstantSizeRegionSplitPolicy.class.getName()); - this.setConf(util.getConfiguration()); + ConstantSizeRegionSplitPolicy.class.getName()); - // replace the HBaseTestingUtility in the unit test with the integration test's - // IntegrationTestingUtility - tag = new TestAcidGuarantees(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); - tag.setHBaseTestingUtil(util); + tool = new AcidGuaranteesTestTool(); + tool.setConf(getConf()); } @Override public TableName getTablename() { - return TestAcidGuarantees.TABLE_NAME; + return TABLE_NAME; } @Override protected Set<String> getColumnFamilies() { - return Sets.newHashSet(Bytes.toString(TestAcidGuarantees.FAMILY_A), - Bytes.toString(TestAcidGuarantees.FAMILY_B), - Bytes.toString(TestAcidGuarantees.FAMILY_C)); + return Sets.newHashSet(Bytes.toString(FAMILY_A), Bytes.toString(FAMILY_B), + Bytes.toString(FAMILY_C)); } - // ***** Actual integration tests + private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners, + int numUniqueRows) throws Exception { + tool.run(new String[] { "-millis", String.valueOf(millisToRun), "-numWriters", + String.valueOf(numWriters), "-numGetters", String.valueOf(numGetters), "-numScanners", + String.valueOf(numScanners), "-numUniqueRows", String.valueOf(numUniqueRows) }); + } + // ***** Actual integration tests @Test public void testGetAtomicity() throws Exception { - tag.runTestAtomicity(20000, 4, 4, 0, 3); + runTestAtomicity(20000, 4, 4, 0, 3); } @Test public void testScanAtomicity() throws Exception { - tag.runTestAtomicity(20000, 3, 0, 2, 3); + runTestAtomicity(20000, 3, 0, 2, 3); } @Test public void testMixedAtomicity() throws Exception { - tag.runTestAtomicity(20000, 4, 2, 2, 3); + runTestAtomicity(20000, 4, 2, 2, 3); } - // **** Command line hook - public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); IntegrationTestingUtility.setUseDistributedCluster(conf); http://git-wip-us.apache.org/repos/asf/hbase/blob/9c29a621/hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java new file mode 100644 index 0000000..5e00e8c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.List; +import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.AbstractHBaseTool; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; + +/** + * A test tool that uses multiple threads to read and write multifamily rows into a table, verifying + * that reads never see partially-complete writes + */ +@InterfaceAudience.Private +public class AcidGuaranteesTestTool extends AbstractHBaseTool { + + private static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class); + + public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees"); + public static final byte[] FAMILY_A = Bytes.toBytes("A"); + public static final byte[] FAMILY_B = Bytes.toBytes("B"); + public static final byte[] FAMILY_C = Bytes.toBytes("C"); + public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); + + public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C }; + + public static int NUM_COLS_TO_CHECK = 50; + + private ExecutorService sharedPool; + + private long millisToRun; + private int numWriters; + private int numGetters; + private int numScanners; + private int numUniqueRows; + private boolean crazyFlush; + private boolean useMob; + + private ExecutorService createThreadPool() { + int maxThreads = 256; + int coreThreads = 128; + + long keepAliveTime = 60; + BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>( + maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); + + ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, + TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(toString() + "-shared")); + tpe.allowCoreThreadTimeOut(true); + return tpe; + } + + @Override + protected void addOptions() { + addOptWithArg("millis", "time limit in milliseconds"); + addOptWithArg("numWriters", "number of write threads"); + addOptWithArg("numGetters", "number of get threads"); + addOptWithArg("numScanners", "number of scan threads"); + addOptWithArg("numUniqueRows", "number of unique rows to test"); + addOptNoArg("crazyFlush", + "if specified we will flush continuously otherwise will flush every minute"); + addOptNoArg("useMob", "if specified we will enable mob on the first column family"); + } + + @Override + protected void processOptions(CommandLine cmd) { + millisToRun = getOptionAsLong(cmd, "millis", 5000); + numWriters = getOptionAsInt(cmd, "numWriters", 50); + numGetters = getOptionAsInt(cmd, "numGetters", 2); + numScanners = getOptionAsInt(cmd, "numScanners", 2); + numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3); + crazyFlush = cmd.hasOption("crazyFlush"); + useMob = cmd.hasOption("useMob"); + } + + @Override + protected int doWork() throws Exception { + sharedPool = createThreadPool(); + try (Connection conn = ConnectionFactory.createConnection(getConf())) { + runTestAtomicity(conn.getAdmin()); + } finally { + sharedPool.shutdown(); + } + return 0; + } + + /** + * Thread that does random full-row writes into a table. + */ + public static class AtomicityWriter extends RepeatingTestThread { + Random rand = new Random(); + byte data[] = new byte[10]; + byte[][] targetRows; + byte[][] targetFamilies; + Connection connection; + Table table; + AtomicLong numWritten = new AtomicLong(); + + public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies, + ExecutorService pool) throws IOException { + super(ctx); + this.targetRows = targetRows; + this.targetFamilies = targetFamilies; + connection = ConnectionFactory.createConnection(ctx.getConf(), pool); + table = connection.getTable(TABLE_NAME); + } + + public void doAnAction() throws Exception { + // Pick a random row to write into + byte[] targetRow = targetRows[rand.nextInt(targetRows.length)]; + Put p = new Put(targetRow); + rand.nextBytes(data); + + for (byte[] family : targetFamilies) { + for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { + byte qualifier[] = Bytes.toBytes("col" + i); + p.addColumn(family, qualifier, data); + } + } + table.put(p); + numWritten.getAndIncrement(); + } + + @Override + public void workDone() throws IOException { + try { + table.close(); + } finally { + connection.close(); + } + } + } + + /** + * Thread that does single-row reads in a table, looking for partially completed rows. + */ + public static class AtomicGetReader extends RepeatingTestThread { + byte[] targetRow; + byte[][] targetFamilies; + Connection connection; + Table table; + int numVerified = 0; + AtomicLong numRead = new AtomicLong(); + + public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies, + ExecutorService pool) throws IOException { + super(ctx); + this.targetRow = targetRow; + this.targetFamilies = targetFamilies; + connection = ConnectionFactory.createConnection(ctx.getConf(), pool); + table = connection.getTable(TABLE_NAME); + } + + public void doAnAction() throws Exception { + Get g = new Get(targetRow); + Result res = table.get(g); + byte[] gotValue = null; + if (res.getRow() == null) { + // Trying to verify but we didn't find the row - the writing + // thread probably just hasn't started writing yet, so we can + // ignore this action + return; + } + + for (byte[] family : targetFamilies) { + for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { + byte qualifier[] = Bytes.toBytes("col" + i); + byte thisValue[] = res.getValue(family, qualifier); + if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { + gotFailure(gotValue, res); + } + numVerified++; + gotValue = thisValue; + } + } + numRead.getAndIncrement(); + } + + @Override + public void workDone() throws IOException { + try { + table.close(); + } finally { + connection.close(); + } + } + + private void gotFailure(byte[] expected, Result res) { + StringBuilder msg = new StringBuilder(); + msg.append("Failed after ").append(numVerified).append("!"); + msg.append("Expected=").append(Bytes.toStringBinary(expected)); + msg.append("Got:\n"); + for (Cell kv : res.listCells()) { + msg.append(kv.toString()); + msg.append(" val= "); + msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); + msg.append("\n"); + } + throw new RuntimeException(msg.toString()); + } + } + + /** + * Thread that does full scans of the table looking for any partially completed rows. + */ + public static class AtomicScanReader extends RepeatingTestThread { + byte[][] targetFamilies; + Table table; + Connection connection; + AtomicLong numScans = new AtomicLong(); + AtomicLong numRowsScanned = new AtomicLong(); + + public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool) + throws IOException { + super(ctx); + this.targetFamilies = targetFamilies; + connection = ConnectionFactory.createConnection(ctx.getConf(), pool); + table = connection.getTable(TABLE_NAME); + } + + public void doAnAction() throws Exception { + Scan s = new Scan(); + for (byte[] family : targetFamilies) { + s.addFamily(family); + } + ResultScanner scanner = table.getScanner(s); + + for (Result res : scanner) { + byte[] gotValue = null; + + for (byte[] family : targetFamilies) { + for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { + byte qualifier[] = Bytes.toBytes("col" + i); + byte thisValue[] = res.getValue(family, qualifier); + if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { + gotFailure(gotValue, res); + } + gotValue = thisValue; + } + } + numRowsScanned.getAndIncrement(); + } + numScans.getAndIncrement(); + } + + @Override + public void workDone() throws IOException { + try { + table.close(); + } finally { + connection.close(); + } + } + + private void gotFailure(byte[] expected, Result res) { + StringBuilder msg = new StringBuilder(); + msg.append("Failed after ").append(numRowsScanned).append("!"); + msg.append("Expected=").append(Bytes.toStringBinary(expected)); + msg.append("Got:\n"); + for (Cell kv : res.listCells()) { + msg.append(kv.toString()); + msg.append(" val= "); + msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); + msg.append("\n"); + } + throw new RuntimeException(msg.toString()); + } + } + + private void createTableIfMissing(Admin admin, boolean useMob) throws IOException { + if (!admin.tableExists(TABLE_NAME)) { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); + Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of) + .forEachOrdered(builder::addColumnFamily); + admin.createTable(builder.build()); + } + ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0]; + if (cfd.isMobEnabled() != useMob) { + admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd) + .setMobEnabled(useMob).setMobThreshold(4).build()); + } + } + + private void runTestAtomicity(Admin admin) throws Exception { + createTableIfMissing(admin, useMob); + TestContext ctx = new TestContext(conf); + + byte rows[][] = new byte[numUniqueRows][]; + for (int i = 0; i < numUniqueRows; i++) { + rows[i] = Bytes.toBytes("test_row_" + i); + } + + List<AtomicityWriter> writers = Lists.newArrayList(); + for (int i = 0; i < numWriters; i++) { + AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool); + writers.add(writer); + ctx.addThread(writer); + } + // Add a flusher + ctx.addThread(new RepeatingTestThread(ctx) { + public void doAnAction() throws Exception { + try { + admin.flush(TABLE_NAME); + } catch (IOException ioe) { + LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe)); + } + // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally, + // we would flush as often as possible. On a running cluster, this isn't practical: + // (1) we will cause a lot of load due to all the flushing and compacting + // (2) we cannot change the flushing/compacting related Configuration options to try to + // alleviate this + // (3) it is an unrealistic workload, since no one would actually flush that often. + // Therefore, let's flush every minute to have more flushes than usual, but not overload + // the running cluster. + if (!crazyFlush) { + Thread.sleep(60000); + } + } + }); + + List<AtomicGetReader> getters = Lists.newArrayList(); + for (int i = 0; i < numGetters; i++) { + AtomicGetReader getter = + new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool); + getters.add(getter); + ctx.addThread(getter); + } + + List<AtomicScanReader> scanners = Lists.newArrayList(); + for (int i = 0; i < numScanners; i++) { + AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool); + scanners.add(scanner); + ctx.addThread(scanner); + } + + ctx.startThreads(); + ctx.waitFor(millisToRun); + ctx.stop(); + + LOG.info("Finished test. Writers:"); + for (AtomicityWriter writer : writers) { + LOG.info(" wrote " + writer.numWritten.get()); + } + LOG.info("Readers:"); + for (AtomicGetReader reader : getters) { + LOG.info(" read " + reader.numRead.get()); + } + LOG.info("Scanners:"); + for (AtomicScanReader scanner : scanners) { + LOG.info(" scanned " + scanner.numScans.get()); + LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); + } + } + + public static void main(String[] args) { + Configuration c = HBaseConfiguration.create(); + int status; + try { + AcidGuaranteesTestTool test = new AcidGuaranteesTestTool(); + status = ToolRunner.run(c, test, args); + } catch (Exception e) { + LOG.error("Exiting due to error", e); + status = -1; + } + System.exit(status); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9c29a621/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java index 9e845ad..e42794e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java @@ -18,434 +18,103 @@ */ package org.apache.hadoop.hbase; -import java.io.IOException; +import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILIES; +import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.TABLE_NAME; + import java.util.List; -import java.util.Random; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; -import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.CompactingMemStore; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.MemStoreLAB; import org.apache.hadoop.hbase.testclassification.FlakeyTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; - -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; /** - * Test case that uses multiple threads to read and write multifamily rows - * into a table, verifying that reads never see partially-complete writes. - * - * This can run as a junit test, or with a main() function which runs against - * a real cluster (eg for testing with failures, region movement, etc) + * Test case that uses multiple threads to read and write multifamily rows into a table, verifying + * that reads never see partially-complete writes. This can run as a junit test, or with a main() + * function which runs against a real cluster (eg for testing with failures, region movement, etc) */ -@Category({FlakeyTests.class, MediumTests.class}) +@Category({ FlakeyTests.class, LargeTests.class }) @RunWith(Parameterized.class) -public class TestAcidGuarantees implements Tool { - @Parameterized.Parameters +public class TestAcidGuarantees { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @Parameterized.Parameters(name = "{index}: compType={0}") public static Object[] data() { return new Object[] { "NONE", "BASIC", "EAGER" }; } - protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class); - public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees"); - public static final byte [] FAMILY_A = Bytes.toBytes("A"); - public static final byte [] FAMILY_B = Bytes.toBytes("B"); - public static final byte [] FAMILY_C = Bytes.toBytes("C"); - public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data"); - - public static final byte[][] FAMILIES = new byte[][] { - FAMILY_A, FAMILY_B, FAMILY_C }; - private HBaseTestingUtility util; + @Parameter + public String compType; - public static int NUM_COLS_TO_CHECK = 50; - - // when run as main - private Configuration conf; - private ExecutorService sharedPool = null; - - private void createTableIfMissing(boolean useMob) - throws IOException { - try { - util.createTable(TABLE_NAME, FAMILIES); - } catch (TableExistsException tee) { - } + private AcidGuaranteesTestTool tool = new AcidGuaranteesTestTool(); - if (useMob) { - HTableDescriptor htd = new HTableDescriptor(util.getAdmin().getTableDescriptor(TABLE_NAME)); - HColumnDescriptor hcd = htd.getColumnFamilies()[0]; - // force mob enabled such that all data is mob data - hcd.setMobEnabled(true); - hcd.setMobThreshold(4); - util.getAdmin().modifyColumnFamily(TABLE_NAME, hcd); - } - } - - public TestAcidGuarantees(String compType) { + @BeforeClass + public static void setUpBeforeClass() throws Exception { // Set small flush size for minicluster so we exercise reseeking scanners - Configuration conf = HBaseConfiguration.create(); + Configuration conf = UTIL.getConfiguration(); conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128 * 1024)); // prevent aggressive region split conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, - ConstantSizeRegionSplitPolicy.class.getName()); + ConstantSizeRegionSplitPolicy.class.getName()); conf.setInt("hfile.format.version", 3); // for mob tests - conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, compType); - if(MemoryCompactionPolicy.valueOf(compType) == MemoryCompactionPolicy.EAGER) { - conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false); - conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.9); - } - util = new HBaseTestingUtility(conf); - sharedPool = createThreadPool(); + UTIL.startMiniCluster(1); } - public void setHBaseTestingUtil(HBaseTestingUtility util) { - this.util = util; + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); } - private ExecutorService createThreadPool() { - - int maxThreads = 256; - int coreThreads = 128; - - long keepAliveTime = 60; - BlockingQueue<Runnable> workQueue = - new LinkedBlockingQueue<Runnable>(maxThreads * - HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); - - ThreadPoolExecutor tpe = new ThreadPoolExecutor( - coreThreads, - maxThreads, - keepAliveTime, - TimeUnit.SECONDS, - workQueue, - Threads.newDaemonThreadFactory(toString() + "-shared")); - tpe.allowCoreThreadTimeOut(true); - return tpe; - } - - public ExecutorService getSharedThreadPool() { - return sharedPool; - } - - /** - * Thread that does random full-row writes into a table. - */ - public static class AtomicityWriter extends RepeatingTestThread { - Random rand = new Random(); - byte data[] = new byte[10]; - byte targetRows[][]; - byte targetFamilies[][]; - Connection connection; - Table table; - AtomicLong numWritten = new AtomicLong(); - - public AtomicityWriter(TestContext ctx, byte targetRows[][], - byte targetFamilies[][], ExecutorService pool) throws IOException { - super(ctx); - this.targetRows = targetRows; - this.targetFamilies = targetFamilies; - connection = ConnectionFactory.createConnection(ctx.getConf(), pool); - table = connection.getTable(TABLE_NAME); - } - public void doAnAction() throws Exception { - // Pick a random row to write into - byte[] targetRow = targetRows[rand.nextInt(targetRows.length)]; - Put p = new Put(targetRow); - rand.nextBytes(data); - - for (byte[] family : targetFamilies) { - for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { - byte qualifier[] = Bytes.toBytes("col" + i); - p.addColumn(family, qualifier, data); - } - } - table.put(p); - numWritten.getAndIncrement(); - } - - @Override - public void workDone() throws IOException { - try { - table.close(); - } finally { - connection.close(); - } - } - } - - /** - * Thread that does single-row reads in a table, looking for partially - * completed rows. - */ - public static class AtomicGetReader extends RepeatingTestThread { - byte targetRow[]; - byte targetFamilies[][]; - Connection connection; - Table table; - int numVerified = 0; - AtomicLong numRead = new AtomicLong(); - - public AtomicGetReader(TestContext ctx, byte targetRow[], - byte targetFamilies[][], ExecutorService pool) throws IOException { - super(ctx); - this.targetRow = targetRow; - this.targetFamilies = targetFamilies; - connection = ConnectionFactory.createConnection(ctx.getConf(), pool); - table = connection.getTable(TABLE_NAME); - } - - public void doAnAction() throws Exception { - Get g = new Get(targetRow); - Result res = table.get(g); - byte[] gotValue = null; - if (res.getRow() == null) { - // Trying to verify but we didn't find the row - the writing - // thread probably just hasn't started writing yet, so we can - // ignore this action - return; - } - - for (byte[] family : targetFamilies) { - for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { - byte qualifier[] = Bytes.toBytes("col" + i); - byte thisValue[] = res.getValue(family, qualifier); - if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { - gotFailure(gotValue, res); - } - numVerified++; - gotValue = thisValue; - } - } - numRead.getAndIncrement(); - } - - @Override - public void workDone() throws IOException { - try { - table.close(); - } finally { - connection.close(); - } - } - - private void gotFailure(byte[] expected, Result res) { - StringBuilder msg = new StringBuilder(); - msg.append("Failed after ").append(numVerified).append("!"); - msg.append("Expected=").append(Bytes.toStringBinary(expected)); - msg.append("Got:\n"); - for (Cell kv : res.listCells()) { - msg.append(kv.toString()); - msg.append(" val= "); - msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); - msg.append("\n"); - } - throw new RuntimeException(msg.toString()); + @Before + public void setUp() throws Exception { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setValue(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, compType); + if (MemoryCompactionPolicy.valueOf(compType) == MemoryCompactionPolicy.EAGER) { + builder.setValue(MemStoreLAB.USEMSLAB_KEY, "false"); + builder.setValue(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, "0.9"); } + Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of) + .forEachOrdered(builder::addColumnFamily); + UTIL.getAdmin().createTable(builder.build()); + tool.setConf(UTIL.getConfiguration()); } - /** - * Thread that does full scans of the table looking for any partially completed - * rows. - */ - public static class AtomicScanReader extends RepeatingTestThread { - byte targetFamilies[][]; - Table table; - Connection connection; - AtomicLong numScans = new AtomicLong(); - AtomicLong numRowsScanned = new AtomicLong(); - - public AtomicScanReader(TestContext ctx, - byte targetFamilies[][], ExecutorService pool) throws IOException { - super(ctx); - this.targetFamilies = targetFamilies; - connection = ConnectionFactory.createConnection(ctx.getConf(), pool); - table = connection.getTable(TABLE_NAME); - } - - public void doAnAction() throws Exception { - Scan s = new Scan(); - for (byte[] family : targetFamilies) { - s.addFamily(family); - } - ResultScanner scanner = table.getScanner(s); - - for (Result res : scanner) { - byte[] gotValue = null; - - for (byte[] family : targetFamilies) { - for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { - byte qualifier[] = Bytes.toBytes("col" + i); - byte thisValue[] = res.getValue(family, qualifier); - if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { - gotFailure(gotValue, res); - } - gotValue = thisValue; - } - } - numRowsScanned.getAndIncrement(); - } - numScans.getAndIncrement(); - } - - @Override - public void workDone() throws IOException { - try { - table.close(); - } finally { - connection.close(); - } - } - - private void gotFailure(byte[] expected, Result res) { - StringBuilder msg = new StringBuilder(); - msg.append("Failed after ").append(numRowsScanned).append("!"); - msg.append("Expected=").append(Bytes.toStringBinary(expected)); - msg.append("Got:\n"); - for (Cell kv : res.listCells()) { - msg.append(kv.toString()); - msg.append(" val= "); - msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); - msg.append("\n"); - } - throw new RuntimeException(msg.toString()); - } + @After + public void tearDown() throws Exception { + UTIL.deleteTable(TABLE_NAME); } - public void runTestAtomicity(long millisToRun, - int numWriters, - int numGetters, - int numScanners, + private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners, int numUniqueRows) throws Exception { runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false); } - public void runTestAtomicity(long millisToRun, - int numWriters, - int numGetters, - int numScanners, - int numUniqueRows, - final boolean systemTest) throws Exception { - runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, systemTest, - false); - } - - public void runTestAtomicity(long millisToRun, - int numWriters, - int numGetters, - int numScanners, - int numUniqueRows, - final boolean systemTest, - final boolean useMob) throws Exception { - - createTableIfMissing(useMob); - // set the max threads to avoid java.lang.OutOfMemoryError: unable to create new native thread - util.getConfiguration().setInt("hbase.hconnection.threads.max", 40); - TestContext ctx = new TestContext(util.getConfiguration()); - - byte rows[][] = new byte[numUniqueRows][]; - for (int i = 0; i < numUniqueRows; i++) { - rows[i] = Bytes.toBytes("test_row_" + i); - } - - List<AtomicityWriter> writers = Lists.newArrayList(); - for (int i = 0; i < numWriters; i++) { - AtomicityWriter writer = new AtomicityWriter( - ctx, rows, FAMILIES, getSharedThreadPool()); - writers.add(writer); - ctx.addThread(writer); - } - // Add a flusher - ctx.addThread(new RepeatingTestThread(ctx) { - Admin admin = util.getAdmin(); - public void doAnAction() throws Exception { - try { - admin.flush(TABLE_NAME); - } catch(IOException ioe) { - LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe)); - } - // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally, - // we would flush as often as possible. On a running cluster, this isn't practical: - // (1) we will cause a lot of load due to all the flushing and compacting - // (2) we cannot change the flushing/compacting related Configuration options to try to - // alleviate this - // (3) it is an unrealistic workload, since no one would actually flush that often. - // Therefore, let's flush every minute to have more flushes than usual, but not overload - // the running cluster. - if (systemTest) Thread.sleep(60000); - } - }); - - List<AtomicGetReader> getters = Lists.newArrayList(); - for (int i = 0; i < numGetters; i++) { - AtomicGetReader getter = new AtomicGetReader( - ctx, rows[i % numUniqueRows], FAMILIES, getSharedThreadPool()); - getters.add(getter); - ctx.addThread(getter); - } - - List<AtomicScanReader> scanners = Lists.newArrayList(); - for (int i = 0; i < numScanners; i++) { - AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, getSharedThreadPool()); - scanners.add(scanner); - ctx.addThread(scanner); - } - - ctx.startThreads(); - ctx.waitFor(millisToRun); - ctx.stop(); - - LOG.info("Finished test. Writers:"); - for (AtomicityWriter writer : writers) { - LOG.info(" wrote " + writer.numWritten.get()); - } - LOG.info("Readers:"); - for (AtomicGetReader reader : getters) { - LOG.info(" read " + reader.numRead.get()); - } - LOG.info("Scanners:"); - for (AtomicScanReader scanner : scanners) { - LOG.info(" scanned " + scanner.numScans.get()); - LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); + private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners, + int numUniqueRows, boolean useMob) throws Exception { + List<String> args = Lists.newArrayList("-millis", String.valueOf(millisToRun), "-numWriters", + String.valueOf(numWriters), "-numGetters", String.valueOf(numGetters), "-numScanners", + String.valueOf(numScanners), "-numUniqueRows", String.valueOf(numUniqueRows), "-crazyFlush"); + if (useMob) { + args.add("-useMob"); } - } - - @Before - public void setUp() throws Exception { - util.startMiniCluster(1); - } - - @After - public void tearDown() throws Exception { - util.shutdownMiniCluster(); + tool.run(args.toArray(new String[0])); } @Test @@ -465,67 +134,16 @@ public class TestAcidGuarantees implements Tool { @Test public void testMobGetAtomicity() throws Exception { - boolean systemTest = false; - boolean useMob = true; - runTestAtomicity(20000, 5, 5, 0, 3, systemTest, useMob); + runTestAtomicity(20000, 5, 5, 0, 3, true); } @Test public void testMobScanAtomicity() throws Exception { - boolean systemTest = false; - boolean useMob = true; - runTestAtomicity(20000, 5, 0, 5, 3, systemTest, useMob); + runTestAtomicity(20000, 5, 0, 5, 3, true); } @Test public void testMobMixedAtomicity() throws Exception { - boolean systemTest = false; - boolean useMob = true; - runTestAtomicity(20000, 5, 2, 2, 3, systemTest, useMob); - } - - //////////////////////////////////////////////////////////////////////////// - // Tool interface - //////////////////////////////////////////////////////////////////////////// - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration c) { - this.conf = c; - this.util = new HBaseTestingUtility(c); + runTestAtomicity(20000, 5, 2, 2, 3, true); } - - @Override - public int run(String[] arg0) throws Exception { - Configuration c = getConf(); - int millis = c.getInt("millis", 5000); - int numWriters = c.getInt("numWriters", 50); - int numGetters = c.getInt("numGetters", 2); - int numScanners = c.getInt("numScanners", 2); - int numUniqueRows = c.getInt("numUniqueRows", 3); - boolean useMob = c.getBoolean("useMob",false); - assert useMob && c.getInt("hfile.format.version", 2) == 3 : "Mob runs must use hfile v3"; - runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true, useMob); - return 0; - } - - public static void main(String args[]) throws Exception { - Configuration c = HBaseConfiguration.create(); - int status; - try { - TestAcidGuarantees test = new TestAcidGuarantees(CompactingMemStore - .COMPACTING_MEMSTORE_TYPE_DEFAULT); - status = ToolRunner.run(c, test, args); - } catch (Exception e) { - LOG.error("Exiting due to error", e); - status = -1; - } - System.exit(status); - } - - } -