HBASE-10616. Integration test for multi-get calls git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1577372 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/41116848 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/41116848 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/41116848 Branch: refs/heads/master Commit: 41116848e5a1f9ceb3558ce654f5aff2064e2158 Parents: a0a4744 Author: Devaraj Das <d...@apache.org> Authored: Thu Mar 13 23:22:33 2014 +0000 Committer: Enis Soztutar <e...@apache.org> Committed: Fri Jun 27 16:39:38 2014 -0700 ---------------------------------------------------------------------- ...undedMultiGetRequestsWithRegionReplicas.java | 75 +++++++++ ...stTimeBoundedRequestsWithRegionReplicas.java | 7 +- .../apache/hadoop/hbase/util/LoadTestTool.java | 13 +- .../hadoop/hbase/util/MultiThreadedReader.java | 155 +++++++++++++------ .../hbase/util/MultiThreadedReaderWithACL.java | 2 +- 5 files changed, 203 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/41116848/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.java new file mode 100644 index 0000000..de12091 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.test; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.IntegrationTests; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.hbase.util.MultiThreadedReader; +import org.apache.hadoop.util.ToolRunner; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +/** + * Extends {@link IntegrationTestTimeBoundedRequestsWithRegionReplicas} for multi-gets + * Besides the options already talked about in IntegrationTestTimeBoundedRequestsWithRegionReplicas + * the addition options here are: + * <pre> + * -DIntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.multiget_batchsize=100 + * -DIntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.num_regions_per_server=5 + * </pre> + * The multiget_batchsize when set to 1 will issue normal GETs. + * The num_regions_per_server argument indirectly impacts the region size (for a given number of + * num_keys_per_server). That in conjunction with multiget_batchsize would have different behaviors + * - the batch of gets goes to the same region or to multiple regions. + */ +@Category(IntegrationTests.class) +public class IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas + extends IntegrationTestTimeBoundedRequestsWithRegionReplicas { + + @Override + protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, + long numKeys) { + List<String> args = Lists.newArrayList(super.getArgsForLoadTestTool( + mode, modeSpecificArg, startKey, numKeys)); + String clazz = this.getClass().getSimpleName(); + args.add("-" + LoadTestTool.OPT_MULTIGET); + args.add(conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_MULTIGET), + Integer.toString(MultiThreadedReader.DEFAULT_BATCH_SIZE))); + + args.add("-" + LoadTestTool.OPT_NUM_REGIONS_PER_SERVER); + args.add(conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_NUM_REGIONS_PER_SERVER), + Integer.toString(LoadTestTool.DEFAULT_NUM_REGIONS_PER_SERVER))); + + return args.toArray(new String[args.size()]); + } + + public static void main(String args[]) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, + new IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas(), args); + System.exit(ret); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/41116848/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java index 9825ea7..dac03a8 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java @@ -323,9 +323,10 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr } @Override - protected void verifyResultsAndUpdateMetrics(boolean verify, String rowKey, long elapsedNano, - Result result, HTable table, boolean isNullExpected) throws IOException { - super.verifyResultsAndUpdateMetrics(verify, rowKey, elapsedNano, result, table, isNullExpected); + protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, + Result[] results, HTable table, boolean isNullExpected) + throws IOException { + super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected); // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC // to complete, but if the request took longer than timeout, we treat that as error. if (elapsedNano > timeoutNano) { http://git-wip-us.apache.org/repos/asf/hbase/blob/41116848/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 34980c4..d193a9c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -133,6 +133,7 @@ public class LoadTestTool extends AbstractHBaseTool { protected static final String OPT_WRITE = "write"; protected static final String OPT_MAX_READ_ERRORS = "max_read_errors"; protected static final String OPT_MULTIPUT = "multiput"; + public static final String OPT_MULTIGET = "multiget_batchsize"; protected static final String OPT_NUM_KEYS = "num_keys"; protected static final String OPT_READ = "read"; protected static final String OPT_START_KEY = "start_key"; @@ -154,7 +155,7 @@ public class LoadTestTool extends AbstractHBaseTool { public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server"; protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE = "Desired number of regions per region server. Defaults to 5."; - protected static int DEFAULT_NUM_REGIONS_PER_SERVER = 5; + public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5; public static final String OPT_REGION_REPLICATION = "region_replication"; protected static final String OPT_REGION_REPLICATION_USAGE = @@ -196,6 +197,7 @@ public class LoadTestTool extends AbstractHBaseTool { // Reader options private int numReaderThreads = DEFAULT_NUM_THREADS; private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; + private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE; private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; private int verifyPercent; @@ -298,6 +300,8 @@ public class LoadTestTool extends AbstractHBaseTool { addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " + "to tolerate before terminating all reader threads. The default is " + MultiThreadedReader.DEFAULT_MAX_ERRORS + "."); + addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " + + "separate gets for every column in a row"); addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " + "reads and writes for concurrent write/read workload. The default " + "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + "."); @@ -432,6 +436,12 @@ public class LoadTestTool extends AbstractHBaseTool { 0, Integer.MAX_VALUE); } + if (cmd.hasOption(OPT_MULTIGET)) { + multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET), + 0, Integer.MAX_VALUE); + } + + System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize); System.out.println("Percent of keys to verify: " + verifyPercent); System.out.println("Reader threads: " + numReaderThreads); } @@ -620,6 +630,7 @@ public class LoadTestTool extends AbstractHBaseTool { } readerThreads.setMaxErrors(maxReadErrors); readerThreads.setKeyWindow(keyWindow); + readerThreads.setMultiGetBatchSize(multiGetBatchSize); } if (isUpdate && isWrite) { http://git-wip-us.apache.org/repos/asf/hbase/blob/41116848/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index b0d44fd..e6d4807 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -64,13 +65,18 @@ public class MultiThreadedReader extends MultiThreadedAction */ public static final int DEFAULT_KEY_WINDOW = 0; + /** + * Default batch size for multigets + */ + public static final int DEFAULT_BATCH_SIZE = 1; //translates to simple GET (no multi GET) + protected AtomicLong numKeysVerified = new AtomicLong(0); protected AtomicLong numReadErrors = new AtomicLong(0); protected AtomicLong numReadFailures = new AtomicLong(0); protected AtomicLong nullResult = new AtomicLong(0); - private int maxErrors = DEFAULT_MAX_ERRORS; private int keyWindow = DEFAULT_KEY_WINDOW; + private int batchSize = DEFAULT_BATCH_SIZE; public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, double verifyPercent) { @@ -91,6 +97,10 @@ public class MultiThreadedReader extends MultiThreadedAction this.keyWindow = keyWindow; } + public void setMultiGetBatchSize(int batchSize) { + this.batchSize = batchSize; + } + @Override public void start(long startKey, long endKey, int numThreads) throws IOException { super.start(startKey, endKey, numThreads); @@ -169,28 +179,38 @@ public class MultiThreadedReader extends MultiThreadedAction startTimeMs = System.currentTimeMillis(); curKey = startKey; + long [] keysForThisReader = new long[batchSize]; + int readingRandomKeyStartIndex = -1; while (curKey < endKey && !aborted) { - long k = getNextKeyToRead(); - - // A sanity check for the key range. - if (k < startKey || k >= endKey) { - numReadErrors.incrementAndGet(); - throw new AssertionError("Load tester logic error: proposed key " + - "to read " + k + " is out of range (startKey=" + startKey + - ", endKey=" + endKey + ")"); - } - - if (k % numThreads != readerId || - writer != null && writer.failedToWriteKey(k)) { - // Skip keys that this thread should not read, as well as the keys - // that we know the writer failed to write. - continue; - } - - readKey(k); - if (k == curKey - 1 && !readingRandomKey) { - // We have verified another unique key. - numUniqueKeysVerified.incrementAndGet(); + int numKeys = 0; + // if multiGet, loop until we have the number of keys equal to the batch size + do { + long k = getNextKeyToRead(); + if (k < startKey || k >= endKey) { + numReadErrors.incrementAndGet(); + throw new AssertionError("Load tester logic error: proposed key " + + "to read " + k + " is out of range (startKey=" + startKey + + ", endKey=" + endKey + ")"); + } + if (k % numThreads != readerId || + writer != null && writer.failedToWriteKey(k)) { + // Skip keys that this thread should not read, as well as the keys + // that we know the writer failed to write. + continue; + } + keysForThisReader[numKeys] = k; + if (readingRandomKey && readingRandomKeyStartIndex == -1) { + //store the first index of a random read + readingRandomKeyStartIndex = numKeys; + } + numKeys++; + } while (numKeys < batchSize); + + if (numKeys > 1) { //meaning there is some key to read + readKey(keysForThisReader); + // We have verified some unique key(s). + numUniqueKeysVerified.getAndAdd(readingRandomKeyStartIndex == -1 ? + numKeys : readingRandomKeyStartIndex); } } } @@ -240,22 +260,44 @@ public class MultiThreadedReader extends MultiThreadedAction % (maxKeyToRead - startKey + 1); } - private Get readKey(long keyToRead) { - Get get = null; - try { - get = createGet(keyToRead); - queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead); - } catch (IOException e) { - numReadFailures.addAndGet(1); - LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") - + ", time from start: " - + (System.currentTimeMillis() - startTimeMs) + " ms"); - if (printExceptionTrace) { - LOG.warn(e); - printExceptionTrace = false; + private Get[] readKey(long[] keysToRead) { + Get [] gets = new Get[keysToRead.length]; + int i = 0; + for (long keyToRead : keysToRead) { + try { + gets[i] = createGet(keyToRead); + if (keysToRead.length == 1) { + queryKey(gets[i], RandomUtils.nextInt(100) < verifyPercent, keyToRead); + } + i++; + } catch (IOException e) { + numReadFailures.addAndGet(1); + LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + + ", time from start: " + + (System.currentTimeMillis() - startTimeMs) + " ms"); + if (printExceptionTrace) { + LOG.warn(e); + printExceptionTrace = false; + } } } - return get; + if (keysToRead.length > 1) { + try { + queryKey(gets, RandomUtils.nextInt(100) < verifyPercent, keysToRead); + } catch (IOException e) { + numReadFailures.addAndGet(gets.length); + for (long keyToRead : keysToRead) { + LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + + ", time from start: " + + (System.currentTimeMillis() - startTimeMs) + " ms"); + } + if (printExceptionTrace) { + LOG.warn(e); + printExceptionTrace = false; + } + } + } + return gets; } protected Get createGet(long keyToRead) throws IOException { @@ -278,28 +320,53 @@ public class MultiThreadedReader extends MultiThreadedAction return get; } - public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { - String rowKey = Bytes.toString(get.getRow()); + public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException { + // read the data + long start = System.nanoTime(); + // Uses multi/batch gets + Result[] results = table.get(Arrays.asList(gets)); + long end = System.nanoTime(); + verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false); + } + public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { // read the data + long start = System.nanoTime(); + // Uses simple get Result result = table.get(get); long end = System.nanoTime(); - verifyResultsAndUpdateMetrics(verify, rowKey, end - start, result, table, false); + verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false); } - protected void verifyResultsAndUpdateMetrics(boolean verify, String rowKey, long elapsedNano, - Result result, HTable table, boolean isNullExpected) + protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, + Result[] results, HTable table, boolean isNullExpected) throws IOException { totalOpTimeMs.addAndGet(elapsedNano / 1000000); - numKeys.addAndGet(1); + numKeys.addAndGet(gets.length); + int i = 0; + for (Result result : results) { + verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table, + isNullExpected); + } + } + + protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano, + Result result, HTable table, boolean isNullExpected) + throws IOException { + verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano, + new Result[]{result}, table, isNullExpected); + } + + private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, + Result result, HTable table, boolean isNullExpected) throws IOException { if (!result.isEmpty()) { if (verify) { numKeysVerified.incrementAndGet(); } } else { - HRegionLocation hloc = table.getRegionLocation( - Bytes.toBytes(rowKey)); + HRegionLocation hloc = table.getRegionLocation(get.getRow()); + String rowKey = Bytes.toString(get.getRow()); LOG.info("Key = " + rowKey + ", RegionServer: " + hloc.getHostname()); if(isNullExpected) { http://git-wip-us.apache.org/repos/asf/hbase/blob/41116848/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java index df59547..3e06393 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java @@ -110,7 +110,7 @@ public class MultiThreadedReaderWithACL extends MultiThreadedReader { boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0); LOG.info("Read happening from ACL " + isNullExpected); long end = System.nanoTime(); - verifyResultsAndUpdateMetrics(verify, rowKey, end - start, result, localTable, isNullExpected); + verifyResultsAndUpdateMetrics(verify, get, end - start, result, localTable, isNullExpected); } catch (IOException e) { recordFailure(keyToRead); }