Revert "HBASE-20601 Add multiPut support and other miscellaneous to PE"


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4005a0c4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4005a0c4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4005a0c4

Branch: refs/heads/HBASE-19064
Commit: 4005a0c4d368a9846f60026614a425192cc247ec
Parents: 320a333
Author: Allan Yang <allan...@apache.org>
Authored: Thu May 24 19:37:21 2018 +0800
Committer: Allan Yang <allan...@163.com>
Committed: Thu May 24 19:37:21 2018 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/PerformanceEvaluation.java     | 344 ++++++++-----------
 .../hadoop/hbase/TestPerformanceEvaluation.java |  46 ---
 2 files changed, 140 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4005a0c4/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index 26ab88d..42acb5c 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -438,17 +438,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     return splits;
   }
 
-  static void setupConnectionCount(final TestOptions opts) {
-    if (opts.oneCon) {
-      opts.connCount = 1;
-    } else {
-      if (opts.connCount == -1) {
-        // set to thread number if connCount is not set
-        opts.connCount = opts.numClientThreads;
-      }
-    }
-  }
-
   /*
    * Run all clients in this vm each to its own thread.
    */
@@ -461,23 +450,14 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     RunResult[] results = new RunResult[opts.numClientThreads];
     ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
       new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
-    setupConnectionCount(opts);
-    final Connection[] cons = new Connection[opts.connCount];
-    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
-    for (int i = 0; i < opts.connCount; i++) {
-      cons[i] = ConnectionFactory.createConnection(conf);
-      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
-    }
-    LOG.info("Created " + opts.connCount + " connections for " +
-        opts.numClientThreads + " threads");
+    final Connection con = ConnectionFactory.createConnection(conf);
+    final AsyncConnection asyncCon = 
ConnectionFactory.createAsyncConnection(conf).get();
     for (int i = 0; i < threads.length; i++) {
       final int index = i;
       threads[i] = pool.submit(new Callable<RunResult>() {
         @Override
         public RunResult call() throws Exception {
           TestOptions threadOpts = new TestOptions(opts);
-          final Connection con = cons[index % cons.length];
-          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
           if (threadOpts.startRow == 0) threadOpts.startRow = index * 
threadOpts.perClientRunRows;
           RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, 
new Status() {
             @Override
@@ -505,26 +485,16 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
              + Arrays.toString(results));
     Arrays.sort(results);
     long total = 0;
-    float avgLatency = 0 ;
-    float avgTPS = 0;
     for (RunResult result : results) {
       total += result.duration;
-      avgLatency += result.hist.getSnapshot().getMean();
-      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
     }
-    avgTPS *= 1000; // ms to second
-    avgLatency = avgLatency / results.length;
-    LOG.info("[" + test + " duration ]"
+    LOG.info("[" + test + "]"
       + "\tMin: " + results[0] + "ms"
       + "\tMax: " + results[results.length - 1] + "ms"
       + "\tAvg: " + (total / results.length) + "ms");
-    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
-    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
-    for (int i = 0; i < opts.connCount; i++) {
-      cons[i].close();
-      asyncCons[i].close();
-    }
 
+    con.close();
+    asyncCon.close();
 
     return results;
   }
@@ -678,12 +648,10 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     boolean writeToWAL = true;
     boolean autoFlush = false;
     boolean oneCon = false;
-    int connCount = -1; //wil decide the actual num later
     boolean useTags = false;
     int noOfTags = 1;
     boolean reportLatency = false;
     int multiGet = 0;
-    int multiPut = 0;
     int randomSleep = 0;
     boolean inMemoryCF = false;
     int presplitRegions = 0;
@@ -732,12 +700,10 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       this.writeToWAL = that.writeToWAL;
       this.autoFlush = that.autoFlush;
       this.oneCon = that.oneCon;
-      this.connCount = that.connCount;
       this.useTags = that.useTags;
       this.noOfTags = that.noOfTags;
       this.reportLatency = that.reportLatency;
       this.multiGet = that.multiGet;
-      this.multiPut = that.multiPut;
       this.inMemoryCF = that.inMemoryCF;
       this.presplitRegions = that.presplitRegions;
       this.replicas = that.replicas;
@@ -892,14 +858,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       this.oneCon = oneCon;
     }
 
-    public int getConnCount() {
-      return connCount;
-    }
-
-    public void setConnCount(int connCount) {
-      this.connCount = connCount;
-    }
-
     public void setUseTags(boolean useTags) {
       this.useTags = useTags;
     }
@@ -916,10 +874,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       this.multiGet = multiGet;
     }
 
-    public void setMultiPut(int multiPut) {
-      this.multiPut = multiPut;
-    }
-
     public void setInMemoryCF(boolean inMemoryCF) {
       this.inMemoryCF = inMemoryCF;
     }
@@ -1028,10 +982,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       return multiGet;
     }
 
-    public int getMultiPut() {
-      return multiPut;
-    }
-
     public boolean isInMemoryCF() {
       return inMemoryCF;
     }
@@ -1248,9 +1198,12 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new 
UniformReservoir(1024 * 500));
       bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new 
UniformReservoir(1024 * 500));
 
+      createConnection();
       onStartup();
     }
 
+    abstract void createConnection() throws IOException;
+
     abstract void onStartup() throws IOException;
 
     void testTakedown() throws IOException {
@@ -1264,14 +1217,10 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
             latencyHistogram));
         status.setStatus("Num measures (latency) : " + 
latencyHistogram.getCount());
         
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
-        if (valueSizeHistogram.getCount() > 0) {
-          status.setStatus("ValueSize (bytes) : "
-              + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
-          status.setStatus("Num measures (ValueSize): " + 
valueSizeHistogram.getCount());
-          
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
-        } else {
-          status.setStatus("No valueSize statistics available");
-        }
+        status.setStatus("ValueSize (bytes) : "
+            + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
+        status.setStatus("Num measures (ValueSize): " + 
valueSizeHistogram.getCount());
+        
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
         if (rpcCallsHistogram.getCount() > 0) {
           status.setStatus("rpcCalls (count): " +
               YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
@@ -1297,11 +1246,13 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
               
YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
         }
       }
+      closeConnection();
       receiverHost.closeReceivers();
     }
 
     abstract void onTakedown() throws IOException;
 
+    abstract void closeConnection() throws IOException;
 
     /*
      * Run test
@@ -1341,16 +1292,14 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
         for (int i = startRow; i < lastRow; i++) {
           if (i % everyN != 0) continue;
           long startTime = System.nanoTime();
-          boolean requestSent = false;
-          try (TraceScope scope = TraceUtil.createTrace("test row");){
-            requestSent = testRow(i);
+          try (TraceScope scope = TraceUtil.createTrace("test row")){
+            testRow(i);
           }
           if ( (i - startRow) > opts.measureAfter) {
-            // If multiget or multiput is enabled, say set to 10, testRow() 
returns immediately
-            // first 9 times and sends the actual get request in the 10th 
iteration.
-            // We should only set latency when actual request is sent because 
otherwise
-            // it turns out to be 0.
-            if (requestSent) {
+            // If multiget is enabled, say set to 10, testRow() returns 
immediately first 9 times
+            // and sends the actual get request in the 10th iteration. We 
should only set latency
+            // when actual request is sent because otherwise it turns out to 
be 0.
+            if (opts.multiGet == 0 || (i - startRow + 1) % opts.multiGet == 0) 
{
               latencyHistogram.update((System.nanoTime() - startTime) / 1000);
             }
             if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
@@ -1375,15 +1324,11 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       return 
YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
     }
 
-
-    /**
-     * Test for individual row.
-     * @param i Row index.
-     * @return true if the row was sent to server and need to record metrics.
-     *         False if not, multiGet and multiPut e.g., the rows are sent
-     *         to server only if enough gets/puts are gathered.
-     */
-    abstract boolean testRow(final int i) throws IOException, 
InterruptedException;
+    /*
+    * Test for individual row.
+    * @param i Row index.
+    */
+    abstract void testRow(final int i) throws IOException, 
InterruptedException;
   }
 
   static abstract class Test extends TestBase {
@@ -1393,6 +1338,20 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       super(con == null ? HBaseConfiguration.create() : 
con.getConfiguration(), options, status);
       this.connection = con;
     }
+
+    @Override
+    void createConnection() throws IOException {
+      if (!opts.isOneCon()) {
+        this.connection = ConnectionFactory.createConnection(conf);
+      }
+    }
+
+    @Override
+    void closeConnection() throws IOException {
+      if (!opts.isOneCon()) {
+        this.connection.close();
+      }
+    }
   }
 
   static abstract class AsyncTest extends TestBase {
@@ -1402,6 +1361,24 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       super(con == null ? HBaseConfiguration.create() : 
con.getConfiguration(), options, status);
       this.connection = con;
     }
+
+    @Override
+    void createConnection() {
+      if (!opts.isOneCon()) {
+        try {
+          this.connection = 
ConnectionFactory.createAsyncConnection(conf).get();
+        } catch (InterruptedException | ExecutionException e) {
+          LOG.error("Failed to create async connection", e);
+        }
+      }
+    }
+
+    @Override
+    void closeConnection() throws IOException {
+      if (!opts.isOneCon()) {
+        this.connection.close();
+      }
+    }
   }
 
   static abstract class TableTest extends Test {
@@ -1454,7 +1431,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException, InterruptedException {
+    void testRow(final int i) throws IOException, InterruptedException {
       if (opts.randomSleep > 0) {
         Thread.sleep(rd.nextInt(opts.randomSleep));
       }
@@ -1483,8 +1460,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
                 this.table.get(this.gets).stream().map(f -> 
propagate(f::get)).toArray(Result[]::new);
             updateValueSize(rs);
             this.gets.clear();
-          } else {
-            return false;
           }
         } else {
           updateValueSize(this.table.get(get).get());
@@ -1492,7 +1467,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       } catch (ExecutionException e) {
         throw new IOException(e);
       }
-      return true;
     }
 
     public static RuntimeException runtime(Throwable e) {
@@ -1526,15 +1500,43 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
   }
 
-  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
-
+  static class AsyncRandomWriteTest extends AsyncTableTest {
     AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status 
status) {
       super(con, options, status);
     }
 
     @Override
-    protected byte[] generateRow(final int i) {
-      return getRandomRow(this.rand, opts.totalRows);
+    void testRow(final int i) throws IOException, InterruptedException {
+      byte[] row = getRandomRow(this.rand, opts.totalRows);
+      Put put = new Put(row);
+      for (int family = 0; family < opts.families; family++) {
+        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
+        for (int column = 0; column < opts.columns; column++) {
+          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + 
column);
+          byte[] value = generateData(this.rand, getValueLength(this.rand));
+          if (opts.useTags) {
+            byte[] tag = generateData(this.rand, TAG_LENGTH);
+            Tag[] tags = new Tag[opts.noOfTags];
+            for (int n = 0; n < opts.noOfTags; n++) {
+              Tag t = new ArrayBackedTag((byte) n, tag);
+              tags[n] = t;
+            }
+            KeyValue kv =
+              new KeyValue(row, familyName, qualifier, 
HConstants.LATEST_TIMESTAMP, value, tags);
+            put.add(kv);
+            updateValueSize(kv.getValueLength());
+          } else {
+            put.addColumn(familyName, qualifier, value);
+            updateValueSize(value.length);
+          }
+        }
+      }
+      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : 
Durability.SKIP_WAL);
+      try {
+        table.put(put).get();
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
     }
   }
 
@@ -1563,7 +1565,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException {
+    void testRow(final int i) throws IOException {
       if (this.testScanner == null) {
         Scan scan =
             new 
Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
@@ -1587,7 +1589,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       }
       Result r = testScanner.next();
       updateValueSize(r);
-      return true;
     }
   }
 
@@ -1597,7 +1598,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException, InterruptedException {
+    void testRow(final int i) throws IOException, InterruptedException {
       Get get = new Get(format(i));
       for (int family = 0; family < opts.families; family++) {
         byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
@@ -1618,29 +1619,17 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       } catch (ExecutionException e) {
         throw new IOException(e);
       }
-      return true;
     }
   }
 
   static class AsyncSequentialWriteTest extends AsyncTableTest {
-    private ArrayList<Put> puts;
-
     AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status 
status) {
       super(con, options, status);
-      if (opts.multiPut > 0) {
-        LOG.info("MultiPut enabled. Sending PUTs in batches of " + 
opts.multiPut + ".");
-        this.puts = new ArrayList<>(opts.multiPut);
-      }
-    }
-
-    protected byte[] generateRow(final int i) {
-      return format(i);
     }
 
     @Override
-    @SuppressWarnings("ReturnValueIgnored")
-    boolean testRow(final int i) throws IOException, InterruptedException {
-      byte[] row = generateRow(i);
+    void testRow(final int i) throws IOException, InterruptedException {
+      byte[] row = format(i);
       Put put = new Put(row);
       for (int family = 0; family < opts.families; family++) {
         byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
@@ -1667,21 +1656,9 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : 
Durability.SKIP_WAL);
       try {
         table.put(put).get();
-        if (opts.multiPut > 0) {
-          this.puts.add(put);
-          if (this.puts.size() == opts.multiPut) {
-            this.table.put(puts).stream().map(f -> 
AsyncRandomReadTest.propagate(f::get));
-            this.puts.clear();
-          } else {
-            return false;
-          }
-        } else {
-          table.put(put).get();
-        }
       } catch (ExecutionException e) {
         throw new IOException(e);
       }
-      return true;
     }
   }
 
@@ -1714,7 +1691,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException {
+    void testRow(final int i) throws IOException {
       Scan scan = new Scan().withStartRow(getRandomRow(this.rand, 
opts.totalRows))
           .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
           .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
@@ -1745,7 +1722,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
         updateScanMetrics(s.getScanMetrics());
         s.close();
       }
-      return true;
     }
 
     @Override
@@ -1762,7 +1738,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException {
+    void testRow(final int i) throws IOException {
       Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
       Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
           .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
@@ -1799,7 +1775,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
         updateScanMetrics(s.getScanMetrics());
         s.close();
       }
-      return true;
     }
 
     protected abstract Pair<byte[],byte[]> getStartAndStopRow();
@@ -1876,7 +1851,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException, InterruptedException {
+    void testRow(final int i) throws IOException, InterruptedException {
       if (opts.randomSleep > 0) {
         Thread.sleep(rd.nextInt(opts.randomSleep));
       }
@@ -1903,13 +1878,10 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
           Result [] rs = this.table.get(this.gets);
           updateValueSize(rs);
           this.gets.clear();
-        } else {
-          return false;
         }
       } else {
         updateValueSize(this.table.get(get));
       }
-      return true;
     }
 
     @Override
@@ -1928,17 +1900,44 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
   }
 
-  static class RandomWriteTest extends SequentialWriteTest {
+  static class RandomWriteTest extends BufferedMutatorTest {
     RandomWriteTest(Connection con, TestOptions options, Status status) {
       super(con, options, status);
     }
 
     @Override
-    protected byte[] generateRow(final int i) {
-      return getRandomRow(this.rand, opts.totalRows);
+    void testRow(final int i) throws IOException {
+      byte[] row = getRandomRow(this.rand, opts.totalRows);
+      Put put = new Put(row);
+      for (int family = 0; family < opts.families; family++) {
+        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
+        for (int column = 0; column < opts.columns; column++) {
+          byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + 
column);
+          byte[] value = generateData(this.rand, getValueLength(this.rand));
+          if (opts.useTags) {
+            byte[] tag = generateData(this.rand, TAG_LENGTH);
+            Tag[] tags = new Tag[opts.noOfTags];
+            for (int n = 0; n < opts.noOfTags; n++) {
+              Tag t = new ArrayBackedTag((byte) n, tag);
+              tags[n] = t;
+            }
+            KeyValue kv = new KeyValue(row, familyName, qualifier, 
HConstants.LATEST_TIMESTAMP,
+              value, tags);
+            put.add(kv);
+            updateValueSize(kv.getValueLength());
+          } else {
+            put.addColumn(familyName, qualifier, value);
+            updateValueSize(value.length);
+          }
+        }
+      }
+      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : 
Durability.SKIP_WAL);
+      if (opts.autoFlush) {
+        table.put(put);
+      } else {
+        mutator.mutate(put);
+      }
     }
-
-
   }
 
   static class ScanTest extends TableTest {
@@ -1958,7 +1957,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
 
 
     @Override
-    boolean testRow(final int i) throws IOException {
+    void testRow(final int i) throws IOException {
       if (this.testScanner == null) {
         Scan scan = new 
Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
             
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
@@ -1981,7 +1980,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       }
       Result r = testScanner.next();
       updateValueSize(r);
-      return true;
     }
   }
 
@@ -2021,7 +2019,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException {
+    void testRow(final int i) throws IOException {
       Increment increment = new Increment(format(i));
       // unlike checkAndXXX tests, which make most sense to do on a single 
value,
       // if multiple families are specified for an increment test we assume it 
is
@@ -2031,7 +2029,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
         increment.addColumn(familyName, getQualifier(), 1l);
       }
       updateValueSize(this.table.increment(increment));
-      return true;
     }
   }
 
@@ -2041,7 +2038,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException {
+    void testRow(final int i) throws IOException {
       byte [] bytes = format(i);
       Append append = new Append(bytes);
       // unlike checkAndXXX tests, which make most sense to do on a single 
value,
@@ -2052,7 +2049,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
         append.addColumn(familyName, getQualifier(), bytes);
       }
       updateValueSize(this.table.append(append));
-      return true;
     }
   }
 
@@ -2062,7 +2058,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException {
+    void testRow(final int i) throws IOException {
       final byte [] bytes = format(i);
       // checkAndXXX tests operate on only a single value
       // Put a known value so when we go to check it, it is there.
@@ -2073,7 +2069,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       mutations.add(put);
       this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
           .ifEquals(bytes).thenMutate(mutations);
-      return true;
     }
   }
 
@@ -2083,7 +2078,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException {
+    void testRow(final int i) throws IOException {
       final byte [] bytes = format(i);
       // checkAndXXX tests operate on only a single value
       // Put a known value so when we go to check it, it is there.
@@ -2092,7 +2087,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       this.table.put(put);
       this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
           .ifEquals(bytes).thenPut(put);
-      return true;
     }
   }
 
@@ -2102,7 +2096,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException {
+    void testRow(final int i) throws IOException {
       final byte [] bytes = format(i);
       // checkAndXXX tests operate on only a single value
       // Put a known value so when we go to check it, it is there.
@@ -2113,7 +2107,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       delete.addColumn(FAMILY_ZERO, getQualifier());
       this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
           .ifEquals(bytes).thenDelete(delete);
-      return true;
     }
   }
 
@@ -2123,7 +2116,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(final int i) throws IOException {
+    void testRow(final int i) throws IOException {
       Get get = new Get(format(i));
       for (int family = 0; family < opts.families; family++) {
         byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
@@ -2140,29 +2133,17 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
         get.setFilter(new FilterAllFilter());
       }
       updateValueSize(table.get(get));
-      return true;
     }
   }
 
   static class SequentialWriteTest extends BufferedMutatorTest {
-    private ArrayList<Put> puts;
-
-
     SequentialWriteTest(Connection con, TestOptions options, Status status) {
       super(con, options, status);
-      if (opts.multiPut > 0) {
-        LOG.info("MultiPut enabled. Sending PUTs in batches of " + 
opts.multiPut + ".");
-        this.puts = new ArrayList<>(opts.multiPut);
-      }
-    }
-
-    protected byte[] generateRow(final int i) {
-      return format(i);
     }
 
     @Override
-    boolean testRow(final int i) throws IOException {
-      byte[] row = generateRow(i);
+    void testRow(final int i) throws IOException {
+      byte[] row = format(i);
       Put put = new Put(row);
       for (int family = 0; family < opts.families; family++) {
         byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
@@ -2188,21 +2169,10 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       }
       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : 
Durability.SKIP_WAL);
       if (opts.autoFlush) {
-        if (opts.multiPut > 0) {
-          this.puts.add(put);
-          if (this.puts.size() == opts.multiPut) {
-            table.put(this.puts);
-            this.puts.clear();
-          } else {
-            return false;
-          }
-        } else {
-          table.put(put);
-        }
+        table.put(put);
       } else {
         mutator.mutate(put);
       }
-      return true;
     }
   }
 
@@ -2214,7 +2184,7 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     }
 
     @Override
-    boolean testRow(int i) throws IOException {
+    void testRow(int i) throws IOException {
       byte[] value = generateData(this.rand, getValueLength(this.rand));
       Scan scan = constructScan(value);
       ResultScanner scanner = null;
@@ -2229,7 +2199,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
           scanner.close();
         }
       }
-      return true;
     }
 
     protected Scan constructScan(byte[] valuePrefix) throws IOException {
@@ -2411,11 +2380,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
     System.err.println(" nomapred        Run multiple clients using threads " +
       "(rather than use mapreduce)");
     System.err.println(" oneCon          all the threads share the same 
connection. Default: False");
-    System.err.println(" connCount          connections all threads share. "
-        + "For example, if set to 2, then all thread share 2 connection. "
-        + "Default: depend on oneCon parameter. if oneCon set to true, then 
connCount=1, "
-        + "if not, connCount=thread number");
-
     System.err.println(" sampleRate      Execute test on a sample of total " +
       "rows. Only supported by randomRead. Default: 1.0");
     System.err.println(" period          Report every 'period' rows: " +
@@ -2452,8 +2416,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
         "'valueSize' in zipf form: Default: Not set.");
     System.err.println(" writeToWAL      Set writeToWAL on puts. Default: 
True");
     System.err.println(" autoFlush       Set autoFlush on htable. Default: 
False");
-    System.err.println(" multiPut        Batch puts together into groups of N. 
Only supported " +
-        "by write. If multiPut is bigger than 0, autoFlush need to set to 
true. Default: 0");
     System.err.println(" presplit        Create presplit table. If a table 
with same name exists,"
         + " it'll be deleted and recreated (instead of verifying count of its 
existing regions). "
         + "Recommended for accurate perf analysis (see guide). Default: 
disabled");
@@ -2606,29 +2568,12 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
       final String autoFlush = "--autoFlush=";
       if (cmd.startsWith(autoFlush)) {
         opts.autoFlush = 
Boolean.parseBoolean(cmd.substring(autoFlush.length()));
-        if (!opts.autoFlush && opts.multiPut > 0) {
-          throw new IllegalArgumentException("autoFlush must be true when 
multiPut is more than 0");
-        }
         continue;
       }
 
       final String onceCon = "--oneCon=";
       if (cmd.startsWith(onceCon)) {
         opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
-        if (opts.oneCon && opts.connCount > 1) {
-          throw new IllegalArgumentException("oneCon is set to true, "
-              + "connCount should not bigger than 1");
-        }
-        continue;
-      }
-
-      final String connCount = "--connCount=";
-      if (cmd.startsWith(connCount)) {
-        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
-        if (opts.oneCon && opts.connCount > 1) {
-          throw new IllegalArgumentException("oneCon is set to true, "
-              + "connCount should not bigger than 1");
-        }
         continue;
       }
 
@@ -2644,15 +2589,6 @@ public class PerformanceEvaluation extends Configured 
implements Tool {
         continue;
       }
 
-      final String multiPut = "--multiPut=";
-      if (cmd.startsWith(multiPut)) {
-        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
-        if (!opts.autoFlush && opts.multiPut > 0) {
-          throw new IllegalArgumentException("autoFlush must be true when 
multiPut is more than 0");
-        }
-        continue;
-      }
-
       final String useTags = "--usetags=";
       if (cmd.startsWith(useTags)) {
         opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/4005a0c4/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
index 2e1ecd0..b2a89c8 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
@@ -243,50 +243,4 @@ public class TestPerformanceEvaluation {
       assertTrue(e.getCause() instanceof NoSuchElementException);
     }
   }
-
-  @Test
-  public void testParseOptsMultiPuts() {
-    Queue<String> opts = new LinkedList<>();
-    String cmdName = "sequentialWrite";
-    opts.offer("--multiPut=10");
-    opts.offer(cmdName);
-    opts.offer("64");
-    PerformanceEvaluation.TestOptions options = null;
-    try {
-      options = PerformanceEvaluation.parseOpts(opts);
-      fail("should fail");
-    } catch (IllegalArgumentException  e) {
-      System.out.println(e.getMessage());
-    }
-    ((LinkedList<String>) opts).offerFirst("--multiPut=10");
-    ((LinkedList<String>) opts).offerFirst("--autoFlush=true");
-    options = PerformanceEvaluation.parseOpts(opts);
-    assertNotNull(options);
-    assertNotNull(options.getCmdName());
-    assertEquals(cmdName, options.getCmdName());
-    assertTrue(options.getMultiPut() == 10);
-  }
-
-  @Test
-  public void testParseOptsConnCount() {
-    Queue<String> opts = new LinkedList<>();
-    String cmdName = "sequentialWrite";
-    opts.offer("--oneCon=true");
-    opts.offer("--connCount=10");
-    opts.offer(cmdName);
-    opts.offer("64");
-    PerformanceEvaluation.TestOptions options = null;
-    try {
-      options = PerformanceEvaluation.parseOpts(opts);
-      fail("should fail");
-    } catch (IllegalArgumentException  e) {
-      System.out.println(e.getMessage());
-    }
-    ((LinkedList<String>) opts).offerFirst("--connCount=10");
-    options = PerformanceEvaluation.parseOpts(opts);
-    assertNotNull(options);
-    assertNotNull(options.getCmdName());
-    assertEquals(cmdName, options.getCmdName());
-    assertTrue(options.getConnCount() == 10);
-  }
 }

Reply via email to