PHOENIX-3412 Used the batch JDBC APIs in pherf.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d7aea492 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d7aea492 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d7aea492 Branch: refs/heads/encodecolumns2 Commit: d7aea492984c72ac77be7dd1305b79e03347ea7a Parents: 7f5d79a Author: Josh Elser <els...@apache.org> Authored: Thu Mar 24 21:48:30 2016 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu Oct 27 14:35:10 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/phoenix/pherf/Pherf.java | 6 +++ .../phoenix/pherf/workload/WriteWorkload.java | 49 ++++++++++++++++++-- 2 files changed, 51 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7aea492/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java index 154d6ff..43061e0 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java @@ -91,6 +91,7 @@ public class Pherf { options.addOption("useAverageCompareType", false, "Compare results with Average query time instead of default is Minimum query time."); options.addOption("t", "thin", false, "Use the Phoenix Thin Driver"); options.addOption("s", "server", true, "The URL for the Phoenix QueryServer"); + options.addOption("b", "batchApi", false, "Use JDBC Batch API for writes"); } private final String zookeeper; @@ -166,6 +167,11 @@ public class Pherf { queryServerUrl = null; } + if (command.hasOption('b')) { + // If the '-b' option was provided, set the system property for WriteWorkload to pick up. + System.setProperty(WriteWorkload.USE_BATCH_API_PROPERTY, Boolean.TRUE.toString()); + } + if ((command.hasOption("h") || (args == null || args.length == 0)) && !command .hasOption("listFiles")) { hf.printHelp("Pherf", options); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7aea492/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java index e536eb9..69d35cc 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java @@ -52,6 +52,9 @@ import org.slf4j.LoggerFactory; public class WriteWorkload implements Workload { private static final Logger logger = LoggerFactory.getLogger(WriteWorkload.class); + + public static final String USE_BATCH_API_PROPERTY = "pherf.default.dataloader.batchApi"; + private final PhoenixUtil pUtil; private final XMLConfigParser parser; private final RulesApplier rulesApplier; @@ -64,6 +67,7 @@ public class WriteWorkload implements Workload { private final int threadPoolSize; private final int batchSize; private final GeneratePhoenixStats generateStatistics; + private final boolean useBatchApi; public WriteWorkload(XMLConfigParser parser) throws Exception { this(PhoenixUtil.create(), parser, GeneratePhoenixStats.NO); @@ -119,6 +123,9 @@ public class WriteWorkload implements Workload { int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool")); + // Should addBatch/executeBatch be used? Default: false + this.useBatchApi = Boolean.getBoolean(USE_BATCH_API_PROPERTY); + this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size; // TODO Move pool management up to WorkloadExecutor @@ -201,7 +208,7 @@ public class WriteWorkload implements Workload { Future<Info> write = upsertData(scenario, phxMetaCols, scenario.getTableName(), threadRowCount, - dataLoadThreadTime); + dataLoadThreadTime, this.useBatchApi); writeBatches.add(write); } if (writeBatches.isEmpty()) { @@ -234,7 +241,7 @@ public class WriteWorkload implements Workload { public Future<Info> upsertData(final Scenario scenario, final List<Column> columns, final String tableName, final int rowCount, - final DataLoadThreadTime dataLoadThreadTime) { + final DataLoadThreadTime dataLoadThreadTime, final boolean useBatchApi) { Future<Info> future = pool.submit(new Callable<Info>() { @Override public Info call() throws Exception { int rowsCreated = 0; @@ -257,8 +264,25 @@ public class WriteWorkload implements Workload { for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime) < maxDuration); i--) { stmt = buildStatement(scenario, columns, stmt, simpleDateFormat); - rowsCreated += stmt.executeUpdate(); + if (useBatchApi) { + stmt.addBatch(); + } else { + rowsCreated += stmt.executeUpdate(); + } if ((i % getBatchSize()) == 0) { + if (useBatchApi) { + int[] results = stmt.executeBatch(); + for (int x = 0; x < results.length; x++) { + int result = results[x]; + if (result < 1) { + final String msg = + "Failed to write update in batch (update count=" + + result + ")"; + throw new RuntimeException(msg); + } + rowsCreated += result; + } + } connection.commit(); duration = System.currentTimeMillis() - last; logger.info("Writer (" + Thread.currentThread().getName() @@ -280,10 +304,27 @@ public class WriteWorkload implements Workload { } } } finally { - if (stmt != null) { + // Need to keep the statement open to send the remaining batch of updates + if (!useBatchApi && stmt != null) { stmt.close(); } if (connection != null) { + if (useBatchApi && stmt != null) { + int[] results = stmt.executeBatch(); + for (int x = 0; x < results.length; x++) { + int result = results[x]; + if (result < 1) { + final String msg = + "Failed to write update in batch (update count=" + + result + ")"; + throw new RuntimeException(msg); + } + rowsCreated += result; + } + // Close the statement after our last batch execution. + stmt.close(); + } + try { connection.commit(); duration = System.currentTimeMillis() - start;