Revert "HBASE-20601 Add multiPut support and other miscellaneous to PE"
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4005a0c4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4005a0c4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4005a0c4 Branch: refs/heads/HBASE-19064 Commit: 4005a0c4d368a9846f60026614a425192cc247ec Parents: 320a333 Author: Allan Yang <allan...@apache.org> Authored: Thu May 24 19:37:21 2018 +0800 Committer: Allan Yang <allan...@163.com> Committed: Thu May 24 19:37:21 2018 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/PerformanceEvaluation.java | 344 ++++++++----------- .../hadoop/hbase/TestPerformanceEvaluation.java | 46 --- 2 files changed, 140 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/4005a0c4/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 26ab88d..42acb5c 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -438,17 +438,6 @@ public class PerformanceEvaluation extends Configured implements Tool { return splits; } - static void setupConnectionCount(final TestOptions opts) { - if (opts.oneCon) { - opts.connCount = 1; - } else { - if (opts.connCount == -1) { - // set to thread number if connCount is not set - opts.connCount = opts.numClientThreads; - } - } - } - /* * Run all clients in this vm each to its own thread. */ @@ -461,23 +450,14 @@ public class PerformanceEvaluation extends Configured implements Tool { RunResult[] results = new RunResult[opts.numClientThreads]; ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); - setupConnectionCount(opts); - final Connection[] cons = new Connection[opts.connCount]; - final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; - for (int i = 0; i < opts.connCount; i++) { - cons[i] = ConnectionFactory.createConnection(conf); - asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); - } - LOG.info("Created " + opts.connCount + " connections for " + - opts.numClientThreads + " threads"); + final Connection con = ConnectionFactory.createConnection(conf); + final AsyncConnection asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); for (int i = 0; i < threads.length; i++) { final int index = i; threads[i] = pool.submit(new Callable<RunResult>() { @Override public RunResult call() throws Exception { TestOptions threadOpts = new TestOptions(opts); - final Connection con = cons[index % cons.length]; - final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { @Override @@ -505,26 +485,16 @@ public class PerformanceEvaluation extends Configured implements Tool { + Arrays.toString(results)); Arrays.sort(results); long total = 0; - float avgLatency = 0 ; - float avgTPS = 0; for (RunResult result : results) { total += result.duration; - avgLatency += result.hist.getSnapshot().getMean(); - avgTPS += opts.perClientRunRows * 1.0f / result.duration; } - avgTPS *= 1000; // ms to second - avgLatency = avgLatency / results.length; - LOG.info("[" + test + " duration ]" + LOG.info("[" + test + "]" + "\tMin: " + results[0] + "ms" + "\tMax: " + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); - LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); - LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); - for (int i = 0; i < opts.connCount; i++) { - cons[i].close(); - asyncCons[i].close(); - } + con.close(); + asyncCon.close(); return results; } @@ -678,12 +648,10 @@ public class PerformanceEvaluation extends Configured implements Tool { boolean writeToWAL = true; boolean autoFlush = false; boolean oneCon = false; - int connCount = -1; //wil decide the actual num later boolean useTags = false; int noOfTags = 1; boolean reportLatency = false; int multiGet = 0; - int multiPut = 0; int randomSleep = 0; boolean inMemoryCF = false; int presplitRegions = 0; @@ -732,12 +700,10 @@ public class PerformanceEvaluation extends Configured implements Tool { this.writeToWAL = that.writeToWAL; this.autoFlush = that.autoFlush; this.oneCon = that.oneCon; - this.connCount = that.connCount; this.useTags = that.useTags; this.noOfTags = that.noOfTags; this.reportLatency = that.reportLatency; this.multiGet = that.multiGet; - this.multiPut = that.multiPut; this.inMemoryCF = that.inMemoryCF; this.presplitRegions = that.presplitRegions; this.replicas = that.replicas; @@ -892,14 +858,6 @@ public class PerformanceEvaluation extends Configured implements Tool { this.oneCon = oneCon; } - public int getConnCount() { - return connCount; - } - - public void setConnCount(int connCount) { - this.connCount = connCount; - } - public void setUseTags(boolean useTags) { this.useTags = useTags; } @@ -916,10 +874,6 @@ public class PerformanceEvaluation extends Configured implements Tool { this.multiGet = multiGet; } - public void setMultiPut(int multiPut) { - this.multiPut = multiPut; - } - public void setInMemoryCF(boolean inMemoryCF) { this.inMemoryCF = inMemoryCF; } @@ -1028,10 +982,6 @@ public class PerformanceEvaluation extends Configured implements Tool { return multiGet; } - public int getMultiPut() { - return multiPut; - } - public boolean isInMemoryCF() { return inMemoryCF; } @@ -1248,9 +1198,12 @@ public class PerformanceEvaluation extends Configured implements Tool { bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); + createConnection(); onStartup(); } + abstract void createConnection() throws IOException; + abstract void onStartup() throws IOException; void testTakedown() throws IOException { @@ -1264,14 +1217,10 @@ public class PerformanceEvaluation extends Configured implements Tool { latencyHistogram)); status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); - if (valueSizeHistogram.getCount() > 0) { - status.setStatus("ValueSize (bytes) : " - + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); - status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); - status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); - } else { - status.setStatus("No valueSize statistics available"); - } + status.setStatus("ValueSize (bytes) : " + + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); + status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); + status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); if (rpcCallsHistogram.getCount() > 0) { status.setStatus("rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); @@ -1297,11 +1246,13 @@ public class PerformanceEvaluation extends Configured implements Tool { YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); } } + closeConnection(); receiverHost.closeReceivers(); } abstract void onTakedown() throws IOException; + abstract void closeConnection() throws IOException; /* * Run test @@ -1341,16 +1292,14 @@ public class PerformanceEvaluation extends Configured implements Tool { for (int i = startRow; i < lastRow; i++) { if (i % everyN != 0) continue; long startTime = System.nanoTime(); - boolean requestSent = false; - try (TraceScope scope = TraceUtil.createTrace("test row");){ - requestSent = testRow(i); + try (TraceScope scope = TraceUtil.createTrace("test row")){ + testRow(i); } if ( (i - startRow) > opts.measureAfter) { - // If multiget or multiput is enabled, say set to 10, testRow() returns immediately - // first 9 times and sends the actual get request in the 10th iteration. - // We should only set latency when actual request is sent because otherwise - // it turns out to be 0. - if (requestSent) { + // If multiget is enabled, say set to 10, testRow() returns immediately first 9 times + // and sends the actual get request in the 10th iteration. We should only set latency + // when actual request is sent because otherwise it turns out to be 0. + if (opts.multiGet == 0 || (i - startRow + 1) % opts.multiGet == 0) { latencyHistogram.update((System.nanoTime() - startTime) / 1000); } if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { @@ -1375,15 +1324,11 @@ public class PerformanceEvaluation extends Configured implements Tool { return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); } - - /** - * Test for individual row. - * @param i Row index. - * @return true if the row was sent to server and need to record metrics. - * False if not, multiGet and multiPut e.g., the rows are sent - * to server only if enough gets/puts are gathered. - */ - abstract boolean testRow(final int i) throws IOException, InterruptedException; + /* + * Test for individual row. + * @param i Row index. + */ + abstract void testRow(final int i) throws IOException, InterruptedException; } static abstract class Test extends TestBase { @@ -1393,6 +1338,20 @@ public class PerformanceEvaluation extends Configured implements Tool { super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); this.connection = con; } + + @Override + void createConnection() throws IOException { + if (!opts.isOneCon()) { + this.connection = ConnectionFactory.createConnection(conf); + } + } + + @Override + void closeConnection() throws IOException { + if (!opts.isOneCon()) { + this.connection.close(); + } + } } static abstract class AsyncTest extends TestBase { @@ -1402,6 +1361,24 @@ public class PerformanceEvaluation extends Configured implements Tool { super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); this.connection = con; } + + @Override + void createConnection() { + if (!opts.isOneCon()) { + try { + this.connection = ConnectionFactory.createAsyncConnection(conf).get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Failed to create async connection", e); + } + } + } + + @Override + void closeConnection() throws IOException { + if (!opts.isOneCon()) { + this.connection.close(); + } + } } static abstract class TableTest extends Test { @@ -1454,7 +1431,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException, InterruptedException { + void testRow(final int i) throws IOException, InterruptedException { if (opts.randomSleep > 0) { Thread.sleep(rd.nextInt(opts.randomSleep)); } @@ -1483,8 +1460,6 @@ public class PerformanceEvaluation extends Configured implements Tool { this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); updateValueSize(rs); this.gets.clear(); - } else { - return false; } } else { updateValueSize(this.table.get(get).get()); @@ -1492,7 +1467,6 @@ public class PerformanceEvaluation extends Configured implements Tool { } catch (ExecutionException e) { throw new IOException(e); } - return true; } public static RuntimeException runtime(Throwable e) { @@ -1526,15 +1500,43 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { - + static class AsyncRandomWriteTest extends AsyncTableTest { AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { super(con, options, status); } @Override - protected byte[] generateRow(final int i) { - return getRandomRow(this.rand, opts.totalRows); + void testRow(final int i) throws IOException, InterruptedException { + byte[] row = getRandomRow(this.rand, opts.totalRows); + Put put = new Put(row); + for (int family = 0; family < opts.families; family++) { + byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); + for (int column = 0; column < opts.columns; column++) { + byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); + byte[] value = generateData(this.rand, getValueLength(this.rand)); + if (opts.useTags) { + byte[] tag = generateData(this.rand, TAG_LENGTH); + Tag[] tags = new Tag[opts.noOfTags]; + for (int n = 0; n < opts.noOfTags; n++) { + Tag t = new ArrayBackedTag((byte) n, tag); + tags[n] = t; + } + KeyValue kv = + new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); + put.add(kv); + updateValueSize(kv.getValueLength()); + } else { + put.addColumn(familyName, qualifier, value); + updateValueSize(value.length); + } + } + } + put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + try { + table.put(put).get(); + } catch (ExecutionException e) { + throw new IOException(e); + } } } @@ -1563,7 +1565,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + void testRow(final int i) throws IOException { if (this.testScanner == null) { Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) @@ -1587,7 +1589,6 @@ public class PerformanceEvaluation extends Configured implements Tool { } Result r = testScanner.next(); updateValueSize(r); - return true; } } @@ -1597,7 +1598,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException, InterruptedException { + void testRow(final int i) throws IOException, InterruptedException { Get get = new Get(format(i)); for (int family = 0; family < opts.families; family++) { byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); @@ -1618,29 +1619,17 @@ public class PerformanceEvaluation extends Configured implements Tool { } catch (ExecutionException e) { throw new IOException(e); } - return true; } } static class AsyncSequentialWriteTest extends AsyncTableTest { - private ArrayList<Put> puts; - AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { super(con, options, status); - if (opts.multiPut > 0) { - LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); - this.puts = new ArrayList<>(opts.multiPut); - } - } - - protected byte[] generateRow(final int i) { - return format(i); } @Override - @SuppressWarnings("ReturnValueIgnored") - boolean testRow(final int i) throws IOException, InterruptedException { - byte[] row = generateRow(i); + void testRow(final int i) throws IOException, InterruptedException { + byte[] row = format(i); Put put = new Put(row); for (int family = 0; family < opts.families; family++) { byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); @@ -1667,21 +1656,9 @@ public class PerformanceEvaluation extends Configured implements Tool { put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); try { table.put(put).get(); - if (opts.multiPut > 0) { - this.puts.add(put); - if (this.puts.size() == opts.multiPut) { - this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); - this.puts.clear(); - } else { - return false; - } - } else { - table.put(put).get(); - } } catch (ExecutionException e) { throw new IOException(e); } - return true; } } @@ -1714,7 +1691,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + void testRow(final int i) throws IOException { Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)) .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) @@ -1745,7 +1722,6 @@ public class PerformanceEvaluation extends Configured implements Tool { updateScanMetrics(s.getScanMetrics()); s.close(); } - return true; } @Override @@ -1762,7 +1738,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + void testRow(final int i) throws IOException { Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) @@ -1799,7 +1775,6 @@ public class PerformanceEvaluation extends Configured implements Tool { updateScanMetrics(s.getScanMetrics()); s.close(); } - return true; } protected abstract Pair<byte[],byte[]> getStartAndStopRow(); @@ -1876,7 +1851,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException, InterruptedException { + void testRow(final int i) throws IOException, InterruptedException { if (opts.randomSleep > 0) { Thread.sleep(rd.nextInt(opts.randomSleep)); } @@ -1903,13 +1878,10 @@ public class PerformanceEvaluation extends Configured implements Tool { Result [] rs = this.table.get(this.gets); updateValueSize(rs); this.gets.clear(); - } else { - return false; } } else { updateValueSize(this.table.get(get)); } - return true; } @Override @@ -1928,17 +1900,44 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomWriteTest extends SequentialWriteTest { + static class RandomWriteTest extends BufferedMutatorTest { RandomWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @Override - protected byte[] generateRow(final int i) { - return getRandomRow(this.rand, opts.totalRows); + void testRow(final int i) throws IOException { + byte[] row = getRandomRow(this.rand, opts.totalRows); + Put put = new Put(row); + for (int family = 0; family < opts.families; family++) { + byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); + for (int column = 0; column < opts.columns; column++) { + byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); + byte[] value = generateData(this.rand, getValueLength(this.rand)); + if (opts.useTags) { + byte[] tag = generateData(this.rand, TAG_LENGTH); + Tag[] tags = new Tag[opts.noOfTags]; + for (int n = 0; n < opts.noOfTags; n++) { + Tag t = new ArrayBackedTag((byte) n, tag); + tags[n] = t; + } + KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, + value, tags); + put.add(kv); + updateValueSize(kv.getValueLength()); + } else { + put.addColumn(familyName, qualifier, value); + updateValueSize(value.length); + } + } + } + put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + if (opts.autoFlush) { + table.put(put); + } else { + mutator.mutate(put); + } } - - } static class ScanTest extends TableTest { @@ -1958,7 +1957,7 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override - boolean testRow(final int i) throws IOException { + void testRow(final int i) throws IOException { if (this.testScanner == null) { Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) @@ -1981,7 +1980,6 @@ public class PerformanceEvaluation extends Configured implements Tool { } Result r = testScanner.next(); updateValueSize(r); - return true; } } @@ -2021,7 +2019,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + void testRow(final int i) throws IOException { Increment increment = new Increment(format(i)); // unlike checkAndXXX tests, which make most sense to do on a single value, // if multiple families are specified for an increment test we assume it is @@ -2031,7 +2029,6 @@ public class PerformanceEvaluation extends Configured implements Tool { increment.addColumn(familyName, getQualifier(), 1l); } updateValueSize(this.table.increment(increment)); - return true; } } @@ -2041,7 +2038,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + void testRow(final int i) throws IOException { byte [] bytes = format(i); Append append = new Append(bytes); // unlike checkAndXXX tests, which make most sense to do on a single value, @@ -2052,7 +2049,6 @@ public class PerformanceEvaluation extends Configured implements Tool { append.addColumn(familyName, getQualifier(), bytes); } updateValueSize(this.table.append(append)); - return true; } } @@ -2062,7 +2058,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + void testRow(final int i) throws IOException { final byte [] bytes = format(i); // checkAndXXX tests operate on only a single value // Put a known value so when we go to check it, it is there. @@ -2073,7 +2069,6 @@ public class PerformanceEvaluation extends Configured implements Tool { mutations.add(put); this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) .ifEquals(bytes).thenMutate(mutations); - return true; } } @@ -2083,7 +2078,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + void testRow(final int i) throws IOException { final byte [] bytes = format(i); // checkAndXXX tests operate on only a single value // Put a known value so when we go to check it, it is there. @@ -2092,7 +2087,6 @@ public class PerformanceEvaluation extends Configured implements Tool { this.table.put(put); this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) .ifEquals(bytes).thenPut(put); - return true; } } @@ -2102,7 +2096,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + void testRow(final int i) throws IOException { final byte [] bytes = format(i); // checkAndXXX tests operate on only a single value // Put a known value so when we go to check it, it is there. @@ -2113,7 +2107,6 @@ public class PerformanceEvaluation extends Configured implements Tool { delete.addColumn(FAMILY_ZERO, getQualifier()); this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()) .ifEquals(bytes).thenDelete(delete); - return true; } } @@ -2123,7 +2116,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + void testRow(final int i) throws IOException { Get get = new Get(format(i)); for (int family = 0; family < opts.families; family++) { byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); @@ -2140,29 +2133,17 @@ public class PerformanceEvaluation extends Configured implements Tool { get.setFilter(new FilterAllFilter()); } updateValueSize(table.get(get)); - return true; } } static class SequentialWriteTest extends BufferedMutatorTest { - private ArrayList<Put> puts; - - SequentialWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); - if (opts.multiPut > 0) { - LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); - this.puts = new ArrayList<>(opts.multiPut); - } - } - - protected byte[] generateRow(final int i) { - return format(i); } @Override - boolean testRow(final int i) throws IOException { - byte[] row = generateRow(i); + void testRow(final int i) throws IOException { + byte[] row = format(i); Put put = new Put(row); for (int family = 0; family < opts.families; family++) { byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); @@ -2188,21 +2169,10 @@ public class PerformanceEvaluation extends Configured implements Tool { } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); if (opts.autoFlush) { - if (opts.multiPut > 0) { - this.puts.add(put); - if (this.puts.size() == opts.multiPut) { - table.put(this.puts); - this.puts.clear(); - } else { - return false; - } - } else { - table.put(put); - } + table.put(put); } else { mutator.mutate(put); } - return true; } } @@ -2214,7 +2184,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(int i) throws IOException { + void testRow(int i) throws IOException { byte[] value = generateData(this.rand, getValueLength(this.rand)); Scan scan = constructScan(value); ResultScanner scanner = null; @@ -2229,7 +2199,6 @@ public class PerformanceEvaluation extends Configured implements Tool { scanner.close(); } } - return true; } protected Scan constructScan(byte[] valuePrefix) throws IOException { @@ -2411,11 +2380,6 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); System.err.println(" oneCon all the threads share the same connection. Default: False"); - System.err.println(" connCount connections all threads share. " - + "For example, if set to 2, then all thread share 2 connection. " - + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " - + "if not, connCount=thread number"); - System.err.println(" sampleRate Execute test on a sample of total " + "rows. Only supported by randomRead. Default: 1.0"); System.err.println(" period Report every 'period' rows: " + @@ -2452,8 +2416,6 @@ public class PerformanceEvaluation extends Configured implements Tool { "'valueSize' in zipf form: Default: Not set."); System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); System.err.println(" autoFlush Set autoFlush on htable. Default: False"); - System.err.println(" multiPut Batch puts together into groups of N. Only supported " + - "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); System.err.println(" presplit Create presplit table. If a table with same name exists," + " it'll be deleted and recreated (instead of verifying count of its existing regions). " + "Recommended for accurate perf analysis (see guide). Default: disabled"); @@ -2606,29 +2568,12 @@ public class PerformanceEvaluation extends Configured implements Tool { final String autoFlush = "--autoFlush="; if (cmd.startsWith(autoFlush)) { opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); - if (!opts.autoFlush && opts.multiPut > 0) { - throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); - } continue; } final String onceCon = "--oneCon="; if (cmd.startsWith(onceCon)) { opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); - if (opts.oneCon && opts.connCount > 1) { - throw new IllegalArgumentException("oneCon is set to true, " - + "connCount should not bigger than 1"); - } - continue; - } - - final String connCount = "--connCount="; - if (cmd.startsWith(connCount)) { - opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); - if (opts.oneCon && opts.connCount > 1) { - throw new IllegalArgumentException("oneCon is set to true, " - + "connCount should not bigger than 1"); - } continue; } @@ -2644,15 +2589,6 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } - final String multiPut = "--multiPut="; - if (cmd.startsWith(multiPut)) { - opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); - if (!opts.autoFlush && opts.multiPut > 0) { - throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); - } - continue; - } - final String useTags = "--usetags="; if (cmd.startsWith(useTags)) { opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); http://git-wip-us.apache.org/repos/asf/hbase/blob/4005a0c4/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java index 2e1ecd0..b2a89c8 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java @@ -243,50 +243,4 @@ public class TestPerformanceEvaluation { assertTrue(e.getCause() instanceof NoSuchElementException); } } - - @Test - public void testParseOptsMultiPuts() { - Queue<String> opts = new LinkedList<>(); - String cmdName = "sequentialWrite"; - opts.offer("--multiPut=10"); - opts.offer(cmdName); - opts.offer("64"); - PerformanceEvaluation.TestOptions options = null; - try { - options = PerformanceEvaluation.parseOpts(opts); - fail("should fail"); - } catch (IllegalArgumentException e) { - System.out.println(e.getMessage()); - } - ((LinkedList<String>) opts).offerFirst("--multiPut=10"); - ((LinkedList<String>) opts).offerFirst("--autoFlush=true"); - options = PerformanceEvaluation.parseOpts(opts); - assertNotNull(options); - assertNotNull(options.getCmdName()); - assertEquals(cmdName, options.getCmdName()); - assertTrue(options.getMultiPut() == 10); - } - - @Test - public void testParseOptsConnCount() { - Queue<String> opts = new LinkedList<>(); - String cmdName = "sequentialWrite"; - opts.offer("--oneCon=true"); - opts.offer("--connCount=10"); - opts.offer(cmdName); - opts.offer("64"); - PerformanceEvaluation.TestOptions options = null; - try { - options = PerformanceEvaluation.parseOpts(opts); - fail("should fail"); - } catch (IllegalArgumentException e) { - System.out.println(e.getMessage()); - } - ((LinkedList<String>) opts).offerFirst("--connCount=10"); - options = PerformanceEvaluation.parseOpts(opts); - assertNotNull(options); - assertNotNull(options.getCmdName()); - assertEquals(cmdName, options.getCmdName()); - assertTrue(options.getConnCount() == 10); - } }