PHOENIX-1920 - Pherf - Add support for mixed r/w workloads
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/18070ccc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/18070ccc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/18070ccc Branch: refs/heads/4.4-HBase-0.98 Commit: 18070ccc12fb0448e82f088577ef6ab9b71a3f65 Parents: 1f5022c Author: cmarcel <cmar...@salesforce.com> Authored: Fri Jun 19 17:23:53 2015 -0700 Committer: cmarcel <cmar...@salesforce.com> Committed: Fri Jun 19 17:23:53 2015 -0700 ---------------------------------------------------------------------- phoenix-pherf/cluster/pherf.sh | 2 +- phoenix-pherf/pom.xml | 11 +- .../org/apache/phoenix/pherf/DataIngestIT.java | 134 ++++-- .../org/apache/phoenix/pherf/PherfMainIT.java | 36 ++ .../apache/phoenix/pherf/ResultBaseTestIT.java | 31 +- .../apache/phoenix/pherf/SchemaReaderIT.java | 17 +- .../java/org/apache/phoenix/pherf/Pherf.java | 179 +++++--- .../apache/phoenix/pherf/PherfConstants.java | 8 +- .../phoenix/pherf/configuration/DataModel.java | 10 - .../phoenix/pherf/configuration/Scenario.java | 12 +- .../pherf/configuration/WriteParams.java | 72 +++ .../pherf/configuration/XMLConfigParser.java | 25 +- .../phoenix/pherf/jmx/MonitorManager.java | 156 ++++--- .../phoenix/pherf/loaddata/DataLoader.java | 332 -------------- .../pherf/result/DataLoadThreadTime.java | 87 ++-- .../pherf/result/DataLoadTimeSummary.java | 54 +-- .../phoenix/pherf/result/DataModelResult.java | 68 ++- .../phoenix/pherf/result/QueryResult.java | 17 +- .../phoenix/pherf/result/QuerySetResult.java | 40 +- .../org/apache/phoenix/pherf/result/Result.java | 11 +- .../phoenix/pherf/result/ResultHandler.java | 5 + .../phoenix/pherf/result/ResultManager.java | 19 +- .../apache/phoenix/pherf/result/ResultUtil.java | 119 +++-- .../phoenix/pherf/result/ResultValue.java | 4 +- .../apache/phoenix/pherf/result/RunTime.java | 179 ++++---- .../phoenix/pherf/result/ScenarioResult.java | 44 +- .../apache/phoenix/pherf/result/ThreadTime.java | 34 +- .../phoenix/pherf/result/file/Extension.java | 3 +- .../phoenix/pherf/result/file/Header.java | 11 +- .../pherf/result/impl/CSVResultHandler.java | 47 +- .../pherf/result/impl/ImageResultHandler.java | 58 +-- .../pherf/result/impl/XMLResultHandler.java | 36 +- .../phoenix/pherf/schema/SchemaReader.java | 2 +- .../apache/phoenix/pherf/util/PhoenixUtil.java | 64 ++- .../pherf/workload/MultiThreadedRunner.java | 153 +++++++ .../pherf/workload/MultithreadedDiffer.java | 131 +++--- .../pherf/workload/MultithreadedRunner.java | 170 ------- .../phoenix/pherf/workload/QueryExecutor.java | 459 ++++++++++--------- .../phoenix/pherf/workload/QueryVerifier.java | 265 +++++------ .../apache/phoenix/pherf/workload/Workload.java | 10 + .../pherf/workload/WorkloadExecutor.java | 109 ++--- .../phoenix/pherf/workload/WriteWorkload.java | 403 ++++++++++++++++ .../scenario/prod_test_unsalted_scenario.xml | 35 ++ .../phoenix/pherf/ConfigurationParserTest.java | 102 +++-- .../org/apache/phoenix/pherf/ResultTest.java | 9 +- .../apache/phoenix/pherf/RuleGeneratorTest.java | 15 +- .../test/resources/scenario/test_scenario.xml | 58 ++- phoenix-pherf/standalone/pherf.sh | 2 +- 48 files changed, 2175 insertions(+), 1673 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/cluster/pherf.sh ---------------------------------------------------------------------- diff --git a/phoenix-pherf/cluster/pherf.sh b/phoenix-pherf/cluster/pherf.sh index aeff856..8d58dfe 100755 --- a/phoenix-pherf/cluster/pherf.sh +++ b/phoenix-pherf/cluster/pherf.sh @@ -28,6 +28,6 @@ for f in $PHERF_HOME/lib/*.jar; do CLASSPATH=${CLASSPATH}:$f; done -CMD="time $}JAVA_HOME}/bin/java ${REMOTE_DEBUG} -Dapp.home=${PHERF_HOME} ${ENV_PROPS} -Xms512m -Xmx3072m -cp ${CLASSPATH} org.apache.phoenix.pherf.Pherf ${@}" +CMD="time ${JAVA_HOME}/bin/java ${REMOTE_DEBUG} -Dapp.home=${PHERF_HOME} ${ENV_PROPS} -Xms512m -Xmx3072m -cp ${CLASSPATH} org.apache.phoenix.pherf.Pherf ${@}" eval $CMD \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-pherf/pom.xml b/phoenix-pherf/pom.xml index 8868138..0facbde 100644 --- a/phoenix-pherf/pom.xml +++ b/phoenix-pherf/pom.xml @@ -16,13 +16,14 @@ ~ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix</artifactId> - <version>4.4.1-HBase-0.98-SNAPSHOT</version> + <version>4.5.0-SNAPSHOT</version> </parent> <artifactId>phoenix-pherf</artifactId> @@ -30,6 +31,7 @@ <name>Phoenix - Pherf</name> <properties> + <top.dir>${project.basedir}/..</top.dir> </properties> <profiles> @@ -232,6 +234,11 @@ <!-- Test Dependencies --> <dependency> + <groupId>com.jcabi</groupId> + <artifactId>jcabi-jdbc</artifactId> + <version>0.15</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java index 2b56f43..828ac38 100644 --- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java +++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java @@ -18,70 +18,122 @@ package org.apache.phoenix.pherf; +import com.jcabi.jdbc.JdbcSession; +import com.jcabi.jdbc.Outcome; import org.apache.phoenix.pherf.configuration.Column; +import org.apache.phoenix.pherf.configuration.DataModel; import org.apache.phoenix.pherf.configuration.DataTypeMapping; import org.apache.phoenix.pherf.configuration.Scenario; -import org.apache.phoenix.pherf.configuration.XMLConfigParser; -import org.apache.phoenix.pherf.loaddata.DataLoader; import org.apache.phoenix.pherf.rules.DataValue; import org.apache.phoenix.pherf.rules.RulesApplier; -import org.apache.phoenix.pherf.schema.SchemaReader; -import org.apache.phoenix.pherf.util.PhoenixUtil; +import org.apache.phoenix.pherf.workload.QueryExecutor; +import org.apache.phoenix.pherf.workload.WorkloadExecutor; +import org.apache.phoenix.pherf.workload.WriteWorkload; +import org.junit.Before; import org.junit.Test; -import java.nio.file.Path; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; public class DataIngestIT extends ResultBaseTestIT { - protected static PhoenixUtil util = new PhoenixUtil(true); - static final String matcherScenario = ".*scenario/.*test.*xml"; - static final String matcherSchema = ".*datamodel/.*test.*sql"; - @Test - public void generateData() throws Exception { - util.setZookeeper("localhost"); - SchemaReader reader = new SchemaReader(util, matcherSchema); - XMLConfigParser parser = new XMLConfigParser(matcherScenario); + @Before + public void applySchema() throws Exception { + reader.applySchema(); + resources = new ArrayList<>(reader.getResourceList()); - // 1. Generate table schema from file - List<Path> resources = new ArrayList<>(reader.getResourceList()); assertTrue("Could not pull list of schema files.", resources.size() > 0); assertNotNull("Could not read schema file.", reader.resourceToString(resources.get(0))); - reader.applySchema(); + } + + @Test + public void testColumnRulesApplied() { + + Scenario scenario = null; + try { + scenario = parser.getScenarioByName("testScenario"); + List<Column> + columnListFromPhoenix = + util.getColumnsFromPhoenix(scenario.getSchemaName(), + scenario.getTableNameWithoutSchemaName(), util.getConnection()); + assertTrue("Could not get phoenix columns.", columnListFromPhoenix.size() > 0); + + WriteWorkload loader = new WriteWorkload(util, parser, scenario); + WorkloadExecutor executor = new WorkloadExecutor(); + executor.add(loader); + + RulesApplier rulesApplier = loader.getRulesApplier(); + List<Map> modelList = rulesApplier.getModelList(); + assertTrue("Could not generate the modelList", modelList.size() > 0); + + for (Column column : columnListFromPhoenix) { + DataValue data = rulesApplier.getDataForRule(scenario, column); - // 2. Load the metadata of for the test tables - Scenario scenario = parser.getScenarios().get(0); - List<Column> columnListFromPhoenix = util.getColumnsFromPhoenix(scenario.getSchemaName(), scenario.getTableNameWithoutSchemaName(), util.getConnection()); - assertTrue("Could not get phoenix columns.", columnListFromPhoenix.size() > 0); - DataLoader loader = new DataLoader(util,parser); - RulesApplier rulesApplier = loader.getRulesApplier(); - List<Map> modelList = rulesApplier.getModelList(); - assertTrue("Could not generate the modelList", modelList.size() > 0); - - for (Column column : columnListFromPhoenix) { - DataValue data = rulesApplier.getDataForRule(scenario, column); - - // We are generating data values so the value should have been specified by this point. - assertTrue("Failed to retrieve data for column type: " + column.getType(), data != null); - - // Test that we still retrieve the GENERAL_CHAR rule even after an override is applied to another CHAR type. - // NEWVAL_STRING Column does not specify an override so we should get the default rule. - if ((column.getType() == DataTypeMapping.VARCHAR) && (column.getName().equals("NEWVAL_STRING"))) { - assertTrue("Failed to retrieve data for column type: ", data.getDistribution() == Integer.MIN_VALUE); + // We are generating data values + // so the value should have been specified by this point. + assertTrue("Failed to retrieve data for column type: " + column.getType(), + data != null); + + // Test that we still retrieve the GENERAL_CHAR rule even after an override is + // applied to another CHAR type. NEWVAL_STRING Column does not specify an override + // so we should get the default rule. + if ((column.getType() == DataTypeMapping.VARCHAR) && (column.getName() + .equals("NEWVAL_STRING"))) { + assertTrue("Failed to retrieve data for column type: ", + data.getDistribution() == Integer.MIN_VALUE); + } } + } catch (Exception e) { + fail("We had an exception: " + e.getMessage()); } + } + + @Test + public void testRWWorkload() throws Exception { + + Connection connection = util.getConnection(); + + WorkloadExecutor executor = new WorkloadExecutor(); + DataModel dataModel = parser.getDataModelByName("test_scenario"); + List<DataModel> dataModels = new ArrayList<>(); + dataModels.add(dataModel); + QueryExecutor + qe = + new QueryExecutor(parser, util, executor.getPool(), dataModels, null, false, + PherfConstants.RunMode.PERFORMANCE); + executor.add(qe); + Scenario scenario = parser.getScenarioByName("testScenarioRW"); + + String sql = "select count(*) from " + scenario.getTableName(); - // Load up the data. try { - loader.execute(); + // Wait for data to load up. + executor.get(); + executor.shutdown(); + + // Verify data has been loaded + Integer count = new JdbcSession(connection).sql(sql).select(new Outcome<Integer>() { + @Override public Integer handle(ResultSet resultSet, Statement statement) + throws SQLException { + while (resultSet.next()) { + return resultSet.getInt(1); + } + return null; + } + }); + assertNotNull("Could not retrieve count. " + count); + + // It would be better to sum up all the rowcounts for the scenarios, but this is fine + assertTrue("Could not query any rows for in " + scenario.getTableName(), count > 0); } catch (Exception e) { - fail("Failed to lead data. An exception was thrown: " + e.getMessage()); + fail("Failed to load data. An exception was thrown: " + e.getMessage()); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java new file mode 100644 index 0000000..2407ef4 --- /dev/null +++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.ExpectedSystemExit; + +public class PherfMainIT extends ResultBaseTestIT { + @Rule + public final ExpectedSystemExit exit = ExpectedSystemExit.none(); + + @Test + public void testPherfMain() { + String[] args = { "-q", + "--scenarioFile", ".*prod_test_unsalted_scenario.*", + "-m", "--monitorFrequency", "10" }; + Pherf.main(args); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java index 6e103b8..d2c5173 100644 --- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java +++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java @@ -19,27 +19,38 @@ package org.apache.phoenix.pherf; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.pherf.configuration.XMLConfigParser; import org.apache.phoenix.pherf.result.ResultUtil; +import org.apache.phoenix.pherf.schema.SchemaReader; +import org.apache.phoenix.pherf.util.PhoenixUtil; import org.junit.BeforeClass; +import java.nio.file.Path; +import java.util.List; import java.util.Properties; public class ResultBaseTestIT extends BaseHBaseManagedTimeIT { - private static boolean isSetUpDone = false; + protected static final String matcherScenario = ".*scenario/.*test.*xml"; + protected static final String matcherSchema = ".*datamodel/.*test.*sql"; - @BeforeClass - public static void setUp() throws Exception { - if (isSetUpDone) { - return; - } + protected static PhoenixUtil util = PhoenixUtil.create(true); + protected static Properties properties; + protected static SchemaReader reader; + protected static XMLConfigParser parser; + protected static List<Path> resources; + protected static ResultUtil resultUtil = new ResultUtil(); + + @BeforeClass public static void setUp() throws Exception { - ResultUtil util = new ResultUtil(); PherfConstants constants = PherfConstants.create(); - Properties properties = constants.getProperties(PherfConstants.PHERF_PROPERTIES); + properties = constants.getProperties(PherfConstants.PHERF_PROPERTIES); String dir = properties.getProperty("pherf.default.results.dir"); String targetDir = "target/" + dir; properties.setProperty("pherf.default.results.dir", targetDir); - util.ensureBaseDirExists(targetDir); - isSetUpDone = true; + resultUtil.ensureBaseDirExists(targetDir); + + util.setZookeeper("localhost"); + reader = new SchemaReader(util, matcherSchema); + parser = new XMLConfigParser(matcherScenario); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java index 2cb7c13..bce1e91 100644 --- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java +++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java @@ -34,15 +34,12 @@ import java.sql.Connection; import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; public class SchemaReaderIT extends BaseHBaseManagedTimeIT { - protected static PhoenixUtil util = new PhoenixUtil(true); + protected static PhoenixUtil util = PhoenixUtil.create(true); - @Test - public void testSchemaReader() { + @Test public void testSchemaReader() { // Test for the unit test version of the schema files. assertApplySchemaTest(); } @@ -55,7 +52,8 @@ public class SchemaReaderIT extends BaseHBaseManagedTimeIT { List<Path> resources = new ArrayList<>(reader.getResourceList()); assertTrue("Could not pull list of schema files.", resources.size() > 0); assertNotNull("Could not read schema file.", this.getClass().getResourceAsStream( - PherfConstants.RESOURCE_DATAMODEL + "/" + resources.get(0).getFileName().toString())); + PherfConstants.RESOURCE_DATAMODEL + "/" + resources.get(0).getFileName() + .toString())); assertNotNull("Could not read schema file.", reader.resourceToString(resources.get(0))); reader.applySchema(); @@ -67,7 +65,10 @@ public class SchemaReaderIT extends BaseHBaseManagedTimeIT { DataModel data = XMLConfigParser.readDataModel(resourcePath); List<Scenario> scenarioList = data.getScenarios(); Scenario scenario = scenarioList.get(0); - List<Column> columnList = util.getColumnsFromPhoenix(scenario.getSchemaName(), scenario.getTableNameWithoutSchemaName(), connection); + List<Column> + columnList = + util.getColumnsFromPhoenix(scenario.getSchemaName(), + scenario.getTableNameWithoutSchemaName(), connection); assertTrue("Could not retrieve Metadata from Phoenix", columnList.size() > 0); } catch (Exception e) { fail("Could not initialize SchemaReader"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java index 073c661..5a9f45f 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java @@ -18,44 +18,61 @@ package org.apache.phoenix.pherf; +import org.apache.commons.cli.*; import org.apache.phoenix.pherf.configuration.XMLConfigParser; +import org.apache.phoenix.pherf.jmx.MonitorManager; import org.apache.phoenix.pherf.schema.SchemaReader; import org.apache.phoenix.pherf.util.PhoenixUtil; import org.apache.phoenix.pherf.util.ResourceList; +import org.apache.phoenix.pherf.workload.QueryExecutor; +import org.apache.phoenix.pherf.workload.Workload; import org.apache.phoenix.pherf.workload.WorkloadExecutor; - -import org.apache.commons.cli.*; +import org.apache.phoenix.pherf.workload.WriteWorkload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Properties; public class Pherf { private static final Logger logger = LoggerFactory.getLogger(Pherf.class); private static final Options options = new Options(); + private final PhoenixUtil phoenixUtil = PhoenixUtil.create(); static { + options.addOption("disableSchemaApply", false, "Set to disable schema from being applied."); + options.addOption("z", "zookeeper", true, + "HBase Zookeeper address for connection. Default: localhost"); + options.addOption("q", "query", false, "Executes multi-threaded query sets"); + options.addOption("listFiles", false, "List available resource files"); + options.addOption("l", "load", false, + "Pre-loads data according to specified configuration values."); + options.addOption("scenarioFile", true, + "Regex or file name for the Test Scenario configuration .xml file to use."); + options.addOption("drop", true, "Regex drop all tables with schema name as PHERF. " + + "\nExample drop Event tables: -drop .*(EVENT).* Drop all: -drop .* or -drop all"); + options.addOption("schemaFile", true, + "Regex or file name for the Test phoenix table schema .sql to use."); options.addOption("m", "monitor", false, "Launch the stats profilers"); - options.addOption("monitorFrequency", true, "Override for frequency in Ms for which monitor should log stats. " + - "\n See pherf.default.monitorFrequency in pherf.properties"); - options.addOption("d", "debug", false, "Put tool in debug mode"); - options.addOption("z", "zookeeper", true, "HBase Zookeeper address for connection. Default: localhost"); - options.addOption("l", "load", false, "Loads data according to specified configuration values."); - options.addOption("scenarioFile", true, "Regex or file name for the Test Scenario configuration .xml file to use."); - options.addOption("drop", true, "Regex drop all tables with schema name as PHERF. " + - "\nExample drop Event tables: -drop .*(EVENT).* Drop all: -drop .* or -drop all"); - options.addOption("schemaFile", true, "Regex or file name for the Test phoenix table schema .sql to use."); - options.addOption("rowCountOverride", true, "Row count override to use instead of one specified in scenario."); + options.addOption("monitorFrequency", true, + "Override for frequency in Ms for which monitor should log stats. " + + "\n See pherf.default.monitorFrequency in pherf.properties"); + options.addOption("rowCountOverride", true, + "Row count override to use instead of one specified in scenario."); options.addOption("hint", true, "Executes all queries with specified hint. Example SMALL"); - options.addOption("diff", false, "Run pherf in verification mode and diff with exported results"); - options.addOption("export", false, "Exports query results to CSV files in " + PherfConstants.EXPORT_DIR + " directory"); - options.addOption("listFiles", false, "List available resource files"); - options.addOption("writerThreadSize", true, "Override the default number of writer threads. " + - "See pherf.default.dataloader.threadpool in Pherf.properties."); - options.addOption("q", "query", false, "Executes multi-threaded query sets"); + options.addOption("diff", false, + "Run pherf in verification mode and diff with exported results"); + options.addOption("export", false, + "Exports query results to CSV files in " + PherfConstants.EXPORT_DIR + + " directory"); + options.addOption("writerThreadSize", true, + "Override the default number of writer threads. " + + "See pherf.default.dataloader.threadpool in Pherf.properties."); options.addOption("h", "help", false, "Get help on using this utility."); + options.addOption("d", "debug", false, "Put tool in debug mode"); } private final String zookeeper; @@ -63,14 +80,15 @@ public class Pherf { private final String schemaFile; private final String queryHint; private final Properties properties; - private final boolean loadData; + private final boolean preLoadData; private final String dropPherfTablesRegEx; private final boolean executeQuerySets; private final boolean exportCSV; private final boolean diff; private final boolean monitor; private final int rowCountOverride; - private final boolean listFiles; + private final boolean listFiles; + private final boolean applySchema; public Pherf(String[] args) throws Exception { CommandLineParser parser = new PosixParser(); @@ -87,30 +105,35 @@ public class Pherf { properties = PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES); dropPherfTablesRegEx = command.getOptionValue("drop", null); monitor = command.hasOption("m"); - String monitorFrequency = (command.hasOption("m") && command.hasOption("monitorFrequency")) - ? command.getOptionValue("monitorFrequency") - : properties.getProperty("pherf.default.monitorFrequency"); + String + monitorFrequency = + (command.hasOption("m") && command.hasOption("monitorFrequency")) ? + command.getOptionValue("monitorFrequency") : + properties.getProperty("pherf.default.monitorFrequency"); properties.setProperty("pherf.default.monitorFrequency", monitorFrequency); logger.debug("Using Monitor: " + monitor); logger.debug("Monitor Frequency Ms:" + monitorFrequency); - loadData = command.hasOption("l"); + preLoadData = command.hasOption("l"); executeQuerySets = command.hasOption("q"); zookeeper = command.getOptionValue("z", "localhost"); queryHint = command.getOptionValue("hint", null); exportCSV = command.hasOption("export"); diff = command.hasOption("diff"); listFiles = command.hasOption("listFiles"); - scenarioFile = command.hasOption("scenarioFile") ? command.getOptionValue("scenarioFile") : null; + applySchema = !command.hasOption("disableSchemaApply"); + scenarioFile = + command.hasOption("scenarioFile") ? command.getOptionValue("scenarioFile") : null; schemaFile = command.hasOption("schemaFile") ? command.getOptionValue("schemaFile") : null; rowCountOverride = Integer.parseInt(command.getOptionValue("rowCountOverride", "0")); - String writerThreadPoolSize = command.getOptionValue("writerThreadSize", - properties.getProperty("pherf.default.dataloader.threadpool")); + String + writerThreadPoolSize = + command.getOptionValue("writerThreadSize", + properties.getProperty("pherf.default.dataloader.threadpool")); properties.setProperty("pherf. default.dataloader.threadpool", writerThreadPoolSize); - - if ((command.hasOption("h") || (args == null || args.length == 0)) - && !command.hasOption("listFiles")) { + if ((command.hasOption("h") || (args == null || args.length == 0)) && !command + .hasOption("listFiles")) { hf.printHelp("Pherf", options); System.exit(1); } @@ -128,17 +151,22 @@ public class Pherf { } public void run() throws Exception { - WorkloadExecutor workloadExec = null; + MonitorManager monitorManager = null; + List<Workload> workloads = new ArrayList<>(); + WorkloadExecutor workloadExecutor = new WorkloadExecutor(properties, workloads); try { if (listFiles) { ResourceList list = new ResourceList(PherfConstants.RESOURCE_DATAMODEL); - Collection<Path> schemaFiles = list.getResourceList(PherfConstants.SCHEMA_ROOT_PATTERN + ".sql"); + Collection<Path> + schemaFiles = + list.getResourceList(PherfConstants.SCHEMA_ROOT_PATTERN + ".sql"); System.out.println("Schema Files:"); for (Path path : schemaFiles) { System.out.println(path); } list = new ResourceList(PherfConstants.RESOURCE_SCENARIO); - Collection<Path> scenarioFiles = + Collection<Path> + scenarioFiles = list.getResourceList(PherfConstants.SCENARIO_ROOT_PATTERN + ".xml"); System.out.println("Scenario Files:"); for (Path path : scenarioFiles) { @@ -146,49 +174,86 @@ public class Pherf { } return; } - workloadExec = (scenarioFile == null) - ? new WorkloadExecutor(properties, - new XMLConfigParser(PherfConstants.DEFAULT_FILE_PATTERN), - monitor) - : new WorkloadExecutor(properties, - new XMLConfigParser(scenarioFile), - monitor); + XMLConfigParser parser = new XMLConfigParser(scenarioFile); // Drop tables with PHERF schema and regex comparison if (null != dropPherfTablesRegEx) { - logger.info("\nDropping existing table with PHERF namename and " - + dropPherfTablesRegEx + " regex expression."); - new PhoenixUtil().deleteTables(dropPherfTablesRegEx); + logger.info( + "\nDropping existing table with PHERF namename and " + dropPherfTablesRegEx + + " regex expression."); + phoenixUtil.deleteTables(dropPherfTablesRegEx); } - // Schema and Data Load - if (loadData) { + if (monitor) { + monitorManager = + new MonitorManager(Integer.parseInt( + properties.getProperty("pherf.default.monitorFrequency"))); + workloadExecutor.add(monitorManager); + } + + if (applySchema) { logger.info("\nStarting to apply schema..."); - SchemaReader reader = (schemaFile == null) - ? new SchemaReader(".*.sql") - : new SchemaReader(schemaFile); + SchemaReader + reader = + (schemaFile == null) ? + new SchemaReader(".*.sql") : + new SchemaReader(schemaFile); reader.applySchema(); + } + // Schema and Data Load + if (preLoadData) { logger.info("\nStarting Data Load..."); - workloadExec.executeDataLoad(); + WriteWorkload workload = new WriteWorkload(parser); + workloadExecutor.add(workload); + + // Wait for dataLoad to complete + workloadExecutor.get(workload); logger.info("\nGenerate query gold files after data load"); - workloadExec.executeMultithreadedQueryExecutor(queryHint, true, PherfConstants.RunMode.FUNCTIONAL); + QueryExecutor + goldFileGenerator = + new QueryExecutor(parser, phoenixUtil, workloadExecutor.getPool(), + parser.getDataModels(), queryHint, true, + PherfConstants.RunMode.FUNCTIONAL); + workloadExecutor + .add(goldFileGenerator); + + // Wait for dataLoad to complete + workloadExecutor.get(goldFileGenerator); } else { - logger.info("\nSKIPPED: Data Load and schema creation as -l argument not specified"); + logger.info( + "\nSKIPPED: Data Load and schema creation as -l argument not specified"); } // Execute multi-threaded query sets if (executeQuerySets) { - logger.info("\nStarting to apply schema..."); - workloadExec.executeMultithreadedQueryExecutor(queryHint, exportCSV, diff ? PherfConstants.RunMode.FUNCTIONAL : PherfConstants.RunMode.PERFORMANCE); + logger.info("\nStarting to apply Execute Queries..."); + + workloadExecutor + .add(new QueryExecutor(parser, phoenixUtil, workloadExecutor.getPool(), + parser.getDataModels(), queryHint, exportCSV, diff ? + PherfConstants.RunMode.FUNCTIONAL : + PherfConstants.RunMode.PERFORMANCE)); + } else { - logger.info("\nSKIPPED: Multithreaded query set execution as -q argument not specified"); + logger.info( + "\nSKIPPED: Multithreaded query set execution as -q argument not specified"); + } + + // Clean up the monitor explicitly + if (monitorManager != null) { + logger.info("Run completed. Shutting down Monitor."); + monitorManager.complete(); } + + // Collect any final jobs + workloadExecutor.get(); + } finally { - if (workloadExec != null) { - logger.info("Run completed. Shutting down Monitor if it was running."); - workloadExec.shutdown(); + if (workloadExecutor != null) { + logger.info("Run completed. Shutting down thread pool."); + workloadExecutor.shutdown(); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java index 493f5a8..e060e53 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java @@ -28,14 +28,13 @@ public class PherfConstants { public static final int DEFAULT_THREAD_POOL_SIZE = 10; public static final int DEFAULT_BATCH_SIZE = 1000; public static final String DEFAULT_DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; - public static final String DEFAULT_FILE_PATTERN = ".*scenario.xml"; public static final String RESOURCE_SCENARIO = "/scenario"; public static final String SCENARIO_ROOT_PATTERN = ".*" + PherfConstants.RESOURCE_SCENARIO.substring(1) + ".*"; public static final String SCHEMA_ROOT_PATTERN = ".*"; public static final String PHERF_PROPERTIES = "pherf.properties"; -// public static final String RESULT_DIR = "RESULTS"; + public static final String EXPORT_DIR = "CSV_EXPORT"; public static final String RESULT_PREFIX = "RESULT_"; public static final String PATH_SEPARATOR = "/"; @@ -51,6 +50,7 @@ public class PherfConstants { public static final String PHERF_SCHEMA_NAME = "PHERF"; + // TODO MOve to properties // log out data load per n rows public static final int LOG_PER_NROWS = 1000000; public static final String COMBINED_FILE_NAME = "COMBINED"; @@ -86,7 +86,9 @@ public class PherfConstants { InputStream is = null; try { is = getClass().getClassLoader().getResourceAsStream(fileName); - properties.load(is); + if (is != null) { + properties.load(is); + } } finally { if (is != null) { is.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java index 25c0df1..8eb42ff 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java @@ -26,7 +26,6 @@ import java.util.List; @XmlRootElement(name = "datamodel") public class DataModel { - private String release; private String name; private List<Scenario> scenarios; private List<Column> dataMappingColumns; @@ -34,15 +33,6 @@ public class DataModel { public DataModel() { } - public String getRelease() { - return this.release; - } - - @XmlAttribute() - public void setRelease(String release) { - this.release = release; - } - public List<Scenario> getScenarios() { return scenarios; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java index d2f113a..7de96cc 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java @@ -34,10 +34,12 @@ public class Scenario { private int rowCount; private Map<String, String> phoenixProperties; private DataOverride dataOverride; - private List<QuerySet> querySet = new ArrayList<QuerySet>(); + private List<QuerySet> querySet = new ArrayList<>(); + private WriteParams writeParams; private String name; public Scenario() { + writeParams = new WriteParams(); } /** @@ -161,6 +163,14 @@ public class Scenario { this.name = name; } + public WriteParams getWriteParams() { + return writeParams; + } + + public void setWriteParams(WriteParams writeParams) { + this.writeParams = writeParams; + } + @Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java new file mode 100644 index 0000000..04be239 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.configuration; + +import javax.xml.bind.annotation.XmlAttribute; + +public class WriteParams { + private int writerThreadCount; + private long threadSleepDuration; + private long batchSize; + private long executionDurationInMs; + + public WriteParams() { + this.batchSize = Long.MIN_VALUE; + this.writerThreadCount = Integer.MIN_VALUE; + this.threadSleepDuration = Long.MIN_VALUE; + this.executionDurationInMs = Long.MAX_VALUE; + } + + public long getThreadSleepDuration() { + return threadSleepDuration; + } + + @SuppressWarnings("unused") + public void setThreadSleepDuration(long threadSleepDuration) { + this.threadSleepDuration = threadSleepDuration; + } + + public long getBatchSize() { + return batchSize; + } + + @SuppressWarnings("unused") + public void setBatchSize(long batchSize) { + this.batchSize = batchSize; + } + + public int getWriterThreadCount() { + return writerThreadCount; + } + + @SuppressWarnings("unused") + public void setWriterThreadCount(int writerThreadCount) { + this.writerThreadCount = writerThreadCount; + } + + @XmlAttribute() + public long getExecutionDurationInMs() { + return executionDurationInMs; + } + + @SuppressWarnings("unused") + public void setExecutionDurationInMs(long executionDurationInMs) { + this.executionDurationInMs = executionDurationInMs; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java index 9b5a9e9..393fa7e 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java @@ -52,6 +52,24 @@ public class XMLConfigParser { return dataModels; } + public DataModel getDataModelByName(String name) { + for (DataModel dataModel : getDataModels()) { + if (dataModel.getName().equals(name)) { + return dataModel; + } + } + return null; + } + + public Scenario getScenarioByName(String name) throws Exception { + for (Scenario scenario : getScenarios()) { + if (scenario.getName().equals(name)) { + return scenario; + } + } + return null; + } + public synchronized Collection<Path> getPaths(String strPattern) throws Exception { if (paths != null) { return paths; @@ -87,7 +105,8 @@ public class XMLConfigParser { * Unmarshall an XML data file * * @param file Name of File - * @return + * @return {@link org.apache.phoenix.pherf.configuration.DataModel} Returns DataModel from + * XML configuration * @throws JAXBException */ // TODO Remove static calls @@ -151,8 +170,6 @@ public class XMLConfigParser { } private Collection<Path> getResources(String pattern) throws Exception { - Collection<Path> resourceFiles = new ArrayList<Path>(); - resourceFiles = resourceList.getResourceList(pattern); - return resourceFiles; + return resourceList.getResourceList(pattern); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java index 9f46cf7..5b39b2b 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java @@ -21,48 +21,54 @@ package org.apache.phoenix.pherf.jmx; import org.apache.phoenix.pherf.PherfConstants; import org.apache.phoenix.pherf.exception.FileLoaderRuntimeException; import org.apache.phoenix.pherf.jmx.monitors.Monitor; -import org.apache.phoenix.pherf.result.file.ResultFileDetails; -import org.apache.phoenix.pherf.result.impl.CSVResultHandler; import org.apache.phoenix.pherf.result.Result; import org.apache.phoenix.pherf.result.ResultHandler; +import org.apache.phoenix.pherf.result.file.ResultFileDetails; +import org.apache.phoenix.pherf.result.impl.CSVResultHandler; +import org.apache.phoenix.pherf.workload.Workload; import org.apache.phoenix.util.DateUtil; -import javax.management.*; +import javax.management.InstanceAlreadyExistsException; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.StandardMBean; import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** - * This class starts JMX stats for the configured monitors. Monitors should be configured in MonitorDetails Enum. + * This class starts JMX stats for the configured monitors. + * Monitors should be configured in MonitorDetails Enum. * Each stat implements {@link org.apache.phoenix.pherf.jmx.monitors.Monitor}. * - * For the duration of any Pherf run, when the configured {@link org.apache.phoenix.pherf.PherfConstants#MONITOR_FREQUENCY} - * is reached a snapshot of each monitor is taken and dumped out to a log file. + * For the duration of any Pherf run, when the configured + * {@link org.apache.phoenix.pherf.PherfConstants#MONITOR_FREQUENCY} is reached a snapshot of + * each monitor is taken and dumped out to a log file. */ -public class MonitorManager implements Runnable { +public class MonitorManager implements Workload { // List of MonitorDetails for all the running monitors. // TODO Move this out to config. Possible use Guice and use IOC to inject it in. - private static final List<MonitorDetails> MONITOR_DETAILS_LIST = + private static final List<MonitorDetails> + MONITOR_DETAILS_LIST = Arrays.asList(MonitorDetails.values()); private final ResultHandler resultHandler; - private final long monitorFrequency; - private AtomicLong rowCount; - private volatile boolean shouldStop = false; - private volatile boolean isRunning = false; + private final AtomicLong monitorFrequency; + private final AtomicLong rowCount; + private final AtomicBoolean shouldStop = new AtomicBoolean(false); + private final AtomicBoolean isRunning = new AtomicBoolean(false); - @SuppressWarnings("unused") - public MonitorManager() throws Exception { + @SuppressWarnings("unused") public MonitorManager() throws Exception { this(PherfConstants.MONITOR_FREQUENCY); } /** - * * @param monitorFrequency Frequency at which monitor stats are written to a log file. * @throws Exception */ public MonitorManager(long monitorFrequency) throws Exception { - this.monitorFrequency = monitorFrequency; + this.monitorFrequency = new AtomicLong(monitorFrequency); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); // Register all the monitors to JMX @@ -77,75 +83,87 @@ public class MonitorManager implements Runnable { } } rowCount = new AtomicLong(0); - this.resultHandler = new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, ResultFileDetails.CSV); + this.resultHandler = + new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, ResultFileDetails.CSV); } - @Override - public void run() { - try { - while (!shouldStop()) { - isRunning = true; - List rowValues = new ArrayList<String>(); - synchronized (resultHandler) { - for (MonitorDetails monitorDetails : MONITOR_DETAILS_LIST) { - rowValues.clear(); - try { - StandardMBean bean = new StandardMBean(monitorDetails.getMonitor(), Monitor.class); - - Calendar calendar = new GregorianCalendar(); - rowValues.add(monitorDetails); - - rowValues.add(((Monitor) bean.getImplementation()).getStat()); - rowValues.add(DateUtil.DEFAULT_MS_DATE_FORMATTER.format(calendar.getTime())); - Result - result = new Result(ResultFileDetails.CSV, ResultFileDetails.CSV_MONITOR.getHeader().toString(), rowValues); - resultHandler.write(result); - } catch (Exception e) { - throw new FileLoaderRuntimeException("Could not log monitor result.", e); + @Override public synchronized void complete() { + this.shouldStop.set(true); + } + + @Override public Runnable execute() { + return new Runnable() { + @Override public void run() { + try { + while (!shouldStop()) { + isRunning.set(true); + List rowValues = new ArrayList<String>(); + synchronized (resultHandler) { + for (MonitorDetails monitorDetails : MONITOR_DETAILS_LIST) { + rowValues.clear(); + try { + StandardMBean + bean = + new StandardMBean(monitorDetails.getMonitor(), + Monitor.class); + + Calendar calendar = new GregorianCalendar(); + rowValues.add(monitorDetails); + + rowValues.add(((Monitor) bean.getImplementation()).getStat()); + rowValues.add(DateUtil.DEFAULT_MS_DATE_FORMATTER + .format(calendar.getTime())); + Result + result = + new Result(ResultFileDetails.CSV, + ResultFileDetails.CSV_MONITOR.getHeader() + .toString(), rowValues); + resultHandler.write(result); + } catch (Exception e) { + throw new FileLoaderRuntimeException( + "Could not log monitor result.", e); + } + rowCount.getAndIncrement(); + } + try { + resultHandler.flush(); + Thread.sleep(getMonitorFrequency()); + } catch (Exception e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } } - rowCount.getAndIncrement(); } + } finally { try { - Thread.sleep(getMonitorFrequency()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - e.printStackTrace(); + isRunning.set(false); + if (resultHandler != null) { + resultHandler.close(); + } + } catch (Exception e) { + throw new FileLoaderRuntimeException("Could not close monitor results.", e); } } } - } finally { - try { - isRunning = false; - if (resultHandler != null) { - resultHandler.flush(); - resultHandler.close(); - - } - } catch (Exception e) { - throw new FileLoaderRuntimeException("Could not close monitor results.", e); - } - } - + }; } public long getMonitorFrequency() { - return monitorFrequency; + return monitorFrequency.get(); } - public synchronized boolean shouldStop() { - return shouldStop; + public boolean shouldStop() { + return shouldStop.get(); } - public synchronized void stop() { - this.shouldStop = true; - } - - public synchronized long getRowCount() { + // Convenience method for testing. + @SuppressWarnings("unused") + public long getRowCount() { return rowCount.get(); } - public synchronized boolean isRunning() { - return isRunning; + public boolean isRunning() { + return isRunning.get(); } /** @@ -158,7 +176,9 @@ public class MonitorManager implements Runnable { ResultHandler handler = null; try { if (resultHandler.isClosed()) { - handler = new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, ResultFileDetails.CSV); + handler = + new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, + ResultFileDetails.CSV); return handler.read(); } else { return resultHandler.read(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java deleted file mode 100644 index c521822..0000000 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.phoenix.pherf.loaddata; - -import java.math.BigDecimal; -import java.sql.Connection; -import java.sql.Date; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Types; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.phoenix.pherf.result.ResultUtil; -import org.apache.phoenix.pherf.util.ResourceList; -import org.apache.phoenix.pherf.util.RowCalculator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.phoenix.pherf.PherfConstants; -import org.apache.phoenix.pherf.configuration.Column; -import org.apache.phoenix.pherf.configuration.DataModel; -import org.apache.phoenix.pherf.configuration.Scenario; -import org.apache.phoenix.pherf.configuration.XMLConfigParser; -import org.apache.phoenix.pherf.exception.PherfException; -import org.apache.phoenix.pherf.result.DataLoadThreadTime; -import org.apache.phoenix.pherf.result.DataLoadTimeSummary; -import org.apache.phoenix.pherf.rules.DataValue; -import org.apache.phoenix.pherf.rules.RulesApplier; -import org.apache.phoenix.pherf.util.PhoenixUtil; - -public class DataLoader { - private static final Logger logger = LoggerFactory.getLogger(DataLoader.class); - private final PhoenixUtil pUtil; - private final XMLConfigParser parser; - private final RulesApplier rulesApplier; - private final ResultUtil resultUtil; - private final ExecutorService pool; - - private final int threadPoolSize; - private final int batchSize; - - public DataLoader(XMLConfigParser parser) throws Exception { - this(new PhoenixUtil(), parser); - } - - public DataLoader(PhoenixUtil phoenixUtil, XMLConfigParser parser) throws Exception{ - this(phoenixUtil, PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES), parser); - } - - /** - * Default the writers to use up all available cores for threads. - * - * @param parser - * @throws Exception - */ - public DataLoader(PhoenixUtil phoenixUtil, Properties properties, XMLConfigParser parser) throws Exception { - this.pUtil = phoenixUtil; - this.parser = parser; - this.rulesApplier = new RulesApplier(parser); - this.resultUtil = new ResultUtil(); - int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool")); - this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size; - this.pool = Executors.newFixedThreadPool(this.threadPoolSize); - String bSize = properties.getProperty("pherf.default.dataloader.batchsize"); - this.batchSize = (bSize == null) ? PherfConstants.DEFAULT_BATCH_SIZE : Integer.parseInt(bSize); - } - - public void execute() throws Exception { - try { - DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary(); - DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime(); - - for (Scenario scenario : getParser().getScenarios()) { - List<Future> writeBatches = new ArrayList<Future>(); - logger.info("\nLoading " + scenario.getRowCount() - + " rows for " + scenario.getTableName()); - long start = System.currentTimeMillis(); - - RowCalculator rowCalculator = new RowCalculator(getThreadPoolSize(), scenario.getRowCount()); - for (int i = 0; i < getThreadPoolSize(); i++) { - List<Column> phxMetaCols = pUtil.getColumnsFromPhoenix( - scenario.getSchemaName(), - scenario.getTableNameWithoutSchemaName(), - pUtil.getConnection()); - int threadRowCount = rowCalculator.getNext(); - logger.info("Kick off thread (#" + i + ")for upsert with (" + threadRowCount + ") rows."); - Future<Info> write = upsertData(scenario, phxMetaCols, - scenario.getTableName(), threadRowCount, dataLoadThreadTime); - writeBatches.add(write); - } - - if (writeBatches.isEmpty()) { - throw new PherfException( - "Holy shit snacks! Throwing up hands in disbelief and exiting. Could not write data for some unknown reason."); - } - - int sumRows = 0, sumDuration = 0; - // Wait for all the batch threads to complete - for (Future<Info> write : writeBatches) { - Info writeInfo = write.get(); - sumRows += writeInfo.getRowCount(); - sumDuration += writeInfo.getDuration(); - logger.info("Executor writes complete with row count (" - + writeInfo.getRowCount() - + ") in Ms (" - + writeInfo.getDuration() + ")"); - } - logger.info("Writes completed with total row count (" + sumRows - + ") with total time of(" + sumDuration + ") Ms"); - dataLoadTimeSummary.add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start)); - - - // always update stats for Phoenix base tables - updatePhoenixStats(scenario.getTableName()); - } - resultUtil.write(dataLoadTimeSummary); - resultUtil.write(dataLoadThreadTime); - - } finally { - pool.shutdown(); - } - } - - /** - * TODO Move this method to PhoenixUtil - * Update Phoenix table stats - * - * @param tableName - * @throws Exception - */ - public void updatePhoenixStats(String tableName) throws Exception { - logger.info("Updating stats for " + tableName); - pUtil.executeStatement("UPDATE STATISTICS " + tableName); - } - - public Future<Info> upsertData(final Scenario scenario, - final List<Column> columns, final String tableName, - final int rowCount, final DataLoadThreadTime dataLoadThreadTime) { - Future<Info> future = pool.submit(new Callable<Info>() { - @Override - public Info call() throws Exception { - int rowsCreated = 0; - Info info = null; - long start = 0, duration = 0, totalDuration = 0; - SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Connection connection = null; - try { - connection = pUtil.getConnection(); - long logStartTime = System.currentTimeMillis(); - for (int i = 0; i < rowCount; i++) { - String sql = buildSql(columns, tableName); - PreparedStatement stmt = connection - .prepareStatement(sql); - stmt = buildStatement(scenario, columns, stmt, simpleDateFormat); - start = System.currentTimeMillis(); - rowsCreated += stmt.executeUpdate(); - stmt.close(); - if ((i % getBatchSize()) == 0) { - connection.commit(); - duration = System.currentTimeMillis() - start; - logger.info("Committed Batch. Total " + tableName + " rows for this thread (" + this.hashCode() + ") in (" - + duration + ") Ms"); - - if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) { - dataLoadThreadTime.add(tableName, Thread.currentThread().getName(), i, System.currentTimeMillis() - logStartTime); - logStartTime = System.currentTimeMillis(); - } - } - } - } finally { - if (connection != null) { - try { - connection.commit(); - duration = System.currentTimeMillis() - start; - logger.info("Committed Final Batch. Duration (" + duration + ") Ms"); - connection.close(); - } catch (SQLException e) { - // Swallow since we are closing anyway - e.printStackTrace(); - } - } - } - totalDuration = System.currentTimeMillis() - start; - return new Info(totalDuration, rowsCreated); - } - }); - return future; - } - - private PreparedStatement buildStatement(Scenario scenario, - List<Column> columns, PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception { - int count = 1; - for (Column column : columns) { - - DataValue dataValue = getRulesApplier().getDataForRule(scenario, - column); - switch (column.getType()) { - case VARCHAR: - if (dataValue.getValue().equals("")) { - statement.setNull(count, Types.VARCHAR); - } else { - statement.setString(count, dataValue.getValue()); - } - break; - case CHAR: - if (dataValue.getValue().equals("")) { - statement.setNull(count, Types.CHAR); - } else { - statement.setString(count, dataValue.getValue()); - } - break; - case DECIMAL: - if (dataValue.getValue().equals("")) { - statement.setNull(count, Types.DECIMAL); - } else { - statement.setBigDecimal(count, - new BigDecimal(dataValue.getValue())); - } - break; - case INTEGER: - if (dataValue.getValue().equals("")) { - statement.setNull(count, Types.INTEGER); - } else { - statement.setInt(count, - Integer.parseInt(dataValue.getValue())); - } - break; - case DATE: - if (dataValue.getValue().equals("")) { - statement.setNull(count, Types.DATE); - } else { - Date date = new java.sql.Date(simpleDateFormat.parse(dataValue.getValue()).getTime()); - statement.setDate(count, date); - } - break; - default: - break; - } - count++; - } - return statement; - } - - private String buildSql(final List<Column> columns, final String tableName) { - StringBuilder builder = new StringBuilder(); - builder.append("upsert into "); - builder.append(tableName); - builder.append(" ("); - int count = 1; - for (Column column : columns) { - builder.append(column.getName()); - if (count < columns.size()) { - builder.append(","); - } else { - builder.append(")"); - } - count++; - } - builder.append(" VALUES ("); - for (int i = 0; i < columns.size(); i++) { - if (i < columns.size() - 1) { - builder.append("?,"); - } else { - builder.append("?)"); - } - } - return builder.toString(); - } - - public XMLConfigParser getParser() { - return parser; - } - - public RulesApplier getRulesApplier() { - return rulesApplier; - } - - public int getBatchSize() { - return batchSize; - } - - public int getThreadPoolSize() { - return threadPoolSize; - } - - private class Info { - - private final int rowCount; - private final long duration; - - public Info(long duration, int rows) { - this(0, 0, 0, duration, rows); - } - - public Info(int regionSize, int completedIterations, int timesSeen, - long duration, int rows) { - this.duration = duration; - this.rowCount = rows; - } - - public long getDuration() { - return duration; - } - - public int getRowCount() { - return rowCount; - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java index 23dcdd5..e5553cc 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java @@ -18,61 +18,68 @@ package org.apache.phoenix.pherf.result; +import org.apache.phoenix.pherf.PherfConstants; + import java.util.ArrayList; import java.util.List; -import org.apache.phoenix.pherf.PherfConstants; - public class DataLoadThreadTime { - private List<WriteThreadTime> threadTime = new ArrayList<WriteThreadTime>(); + private List<WriteThreadTime> threadTime = new ArrayList<WriteThreadTime>(); + + public List<WriteThreadTime> getThreadTime() { + return threadTime; + } - public List<WriteThreadTime> getThreadTime() { - return threadTime; - } + public void add(String tableName, String threadName, long rowsUpserted, + long timeInMsPerMillionRows) { + threadTime.add(new WriteThreadTime(tableName, threadName, rowsUpserted, + timeInMsPerMillionRows)); + } - public void add(String tableName, String threadName, int rowsUpserted, long timeInMsPerMillionRows) { - threadTime.add(new WriteThreadTime(tableName, threadName, rowsUpserted, timeInMsPerMillionRows)); - } - - public String getCsvTitle() { - return "TABLE_NAME,THREAD_NAME,ROWS_UPSERTED,TIME_IN_MS_PER_" + PherfConstants.LOG_PER_NROWS + "_ROWS\n"; - } + public String getCsvTitle() { + return "TABLE_NAME,THREAD_NAME,ROWS_UPSERTED,TIME_IN_MS_PER_" + PherfConstants.LOG_PER_NROWS + + "_ROWS\n"; + } } class WriteThreadTime { - private String tableName; - private String threadName; - private int rowsUpserted; - private long timeInMsPerMillionRows; - - public WriteThreadTime(String tableName, String threadName, int rowsUpserted, long timeInMsPerMillionRows) { - this.tableName = tableName; - this.threadName = threadName; - this.rowsUpserted = rowsUpserted; - this.timeInMsPerMillionRows = timeInMsPerMillionRows; - } - - public String getTableName() { - return tableName; - } - public String getThreadName() { - return threadName; - } - public long getTimeInMsPerMillionRows() { - return timeInMsPerMillionRows; - } + private String tableName; + private String threadName; + private long rowsUpserted; + private long timeInMsPerMillionRows; + + public WriteThreadTime(String tableName, String threadName, long rowsUpserted, + long timeInMsPerMillionRows) { + this.tableName = tableName; + this.threadName = threadName; + this.rowsUpserted = rowsUpserted; + this.timeInMsPerMillionRows = timeInMsPerMillionRows; + } + + public String getTableName() { + return tableName; + } + + public String getThreadName() { + return threadName; + } + + public long getTimeInMsPerMillionRows() { + return timeInMsPerMillionRows; + } - public List<ResultValue> getCsvRepresentation(ResultUtil util) { + public List<ResultValue> getCsvRepresentation(ResultUtil util) { List<ResultValue> rowValues = new ArrayList<>(); rowValues.add(new ResultValue(util.convertNull(getTableName()))); rowValues.add(new ResultValue(util.convertNull(getThreadName()))); rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRowsUpserted())))); - rowValues.add(new ResultValue(util.convertNull(String.valueOf(getTimeInMsPerMillionRows())))); + rowValues.add(new ResultValue( + util.convertNull(String.valueOf(getTimeInMsPerMillionRows())))); return rowValues; - } + } - public int getRowsUpserted() { - return rowsUpserted; - } + public long getRowsUpserted() { + return rowsUpserted; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java index bb23f16..0ff5c59 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java @@ -22,29 +22,29 @@ import java.util.ArrayList; import java.util.List; public class DataLoadTimeSummary { - private List<TableLoadTime> tableLoadTime = new ArrayList<TableLoadTime>(); + private List<TableLoadTime> tableLoadTime = new ArrayList<TableLoadTime>(); - public List<TableLoadTime> getTableLoadTime() { - return tableLoadTime; - } - - public void add(String tableName, int rowCount, int durationInMs) { - tableLoadTime.add(new TableLoadTime(tableName, rowCount, durationInMs)); - } + public List<TableLoadTime> getTableLoadTime() { + return tableLoadTime; + } + + public void add(String tableName, int rowCount, int durationInMs) { + tableLoadTime.add(new TableLoadTime(tableName, rowCount, durationInMs)); + } } class TableLoadTime { - private int durationInMs; - private String tableName; - private int rowCount; + private int durationInMs; + private String tableName; + private int rowCount; + + public TableLoadTime(String tableName, int rowCount, int durationInMs) { + this.tableName = tableName; + this.rowCount = rowCount; + this.durationInMs = durationInMs; + } - public TableLoadTime(String tableName, int rowCount, int durationInMs) { - this.tableName = tableName; - this.rowCount = rowCount; - this.durationInMs = durationInMs; - } - - public List<ResultValue> getCsvRepresentation(ResultUtil util) { + public List<ResultValue> getCsvRepresentation(ResultUtil util) { List<ResultValue> rowValues = new ArrayList<>(); rowValues.add(new ResultValue(util.convertNull(getTableName()))); rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRowCount())))); @@ -53,15 +53,15 @@ class TableLoadTime { return rowValues; } - public int getDurationInMs() { - return durationInMs; - } + public int getDurationInMs() { + return durationInMs; + } - public String getTableName() { - return tableName; - } + public String getTableName() { + return tableName; + } - public int getRowCount() { - return rowCount; - } + public int getRowCount() { + return rowCount; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java index 72920fa..5c07ffe 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java @@ -18,61 +18,57 @@ package org.apache.phoenix.pherf.result; -import java.util.ArrayList; -import java.util.List; +import org.apache.phoenix.pherf.configuration.DataModel; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; -import org.apache.phoenix.pherf.configuration.DataModel; +@XmlRootElement(namespace = "org.apache.phoenix.pherf.result") public class DataModelResult + extends DataModel { + private List<ScenarioResult> scenarioResult = new ArrayList<ScenarioResult>(); + private String zookeeper; -@XmlRootElement(namespace = "org.apache.phoenix.pherf.result") -public class DataModelResult extends DataModel { - private List<ScenarioResult> scenarioResult = new ArrayList<ScenarioResult>(); - private String zookeeper; + public List<ScenarioResult> getScenarioResult() { + return scenarioResult; + } - public List<ScenarioResult> getScenarioResult() { - return scenarioResult; - } + @SuppressWarnings("unused") public void setScenarioResult(List<ScenarioResult> scenarioResult) { + this.scenarioResult = scenarioResult; + } - @SuppressWarnings("unused") - public void setScenarioResult(List<ScenarioResult> scenarioResult) { - this.scenarioResult = scenarioResult; - } - - public DataModelResult() { - } + public DataModelResult() { + } - private DataModelResult(String name, String release, String zookeeper) { + private DataModelResult(String name, String zookeeper) { this.setName(name); - this.setRelease(release); this.zookeeper = zookeeper; } /** * Copy constructor - * + * * @param dataModelResult */ public DataModelResult(DataModelResult dataModelResult) { - this(dataModelResult.getName(), dataModelResult.getRelease(), dataModelResult.getZookeeper()); + this(dataModelResult.getName(), dataModelResult.getZookeeper()); this.scenarioResult = dataModelResult.getScenarioResult(); } - - public DataModelResult(DataModel dataModel, String zookeeper) { - this(dataModel.getName(), dataModel.getRelease(), zookeeper); - } - - public DataModelResult(DataModel dataModel) { - this(dataModel, null); - } - @XmlAttribute() - public String getZookeeper() { - return zookeeper; - } + public DataModelResult(DataModel dataModel, String zookeeper) { + this(dataModel.getName(), zookeeper); + } + + public DataModelResult(DataModel dataModel) { + this(dataModel, null); + } + + @XmlAttribute() public String getZookeeper() { + return zookeeper; + } - public void setZookeeper(String zookeeper) { - this.zookeeper = zookeeper; - } + public void setZookeeper(String zookeeper) { + this.zookeeper = zookeeper; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java index b5fd082..1a682da 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java @@ -18,14 +18,14 @@ package org.apache.phoenix.pherf.result; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - import org.apache.phoenix.pherf.PherfConstants.RunMode; import org.apache.phoenix.pherf.configuration.Query; import org.apache.phoenix.util.DateUtil; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + public class QueryResult extends Query { private List<ThreadTime> threadTimes = new ArrayList<ThreadTime>(); @@ -47,8 +47,7 @@ public class QueryResult extends Query { this.setId(query.getId()); } - @SuppressWarnings("unused") - public QueryResult() { + @SuppressWarnings("unused") public QueryResult() { } public Date getStartTime() { @@ -136,8 +135,8 @@ public class QueryResult extends Query { } private String getStartTimeText() { - return (null == this.getStartTime()) - ? "" - : DateUtil.DEFAULT_MS_DATE_FORMATTER.format(this.getStartTime()); + return (null == this.getStartTime()) ? + "" : + DateUtil.DEFAULT_MS_DATE_FORMATTER.format(this.getStartTime()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java index 9010c21..c2be5a3 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java @@ -18,31 +18,31 @@ package org.apache.phoenix.pherf.result; +import org.apache.phoenix.pherf.configuration.QuerySet; + import java.util.ArrayList; import java.util.List; -import org.apache.phoenix.pherf.configuration.QuerySet; - public class QuerySetResult extends QuerySet { - - private List<QueryResult> queryResults = new ArrayList<QueryResult>(); - - public QuerySetResult(QuerySet querySet) { - this.setConcurrency(querySet.getConcurrency()); - this.setNumberOfExecutions(querySet.getNumberOfExecutions()); - this.setExecutionDurationInMs(querySet.getExecutionDurationInMs()); - this.setExecutionType(querySet.getExecutionType()); - } - - public QuerySetResult() { - } - - public List<QueryResult> getQueryResults() { - return queryResults; - } + + private List<QueryResult> queryResults = new ArrayList<>(); + + public QuerySetResult(QuerySet querySet) { + this.setConcurrency(querySet.getConcurrency()); + this.setNumberOfExecutions(querySet.getNumberOfExecutions()); + this.setExecutionDurationInMs(querySet.getExecutionDurationInMs()); + this.setExecutionType(querySet.getExecutionType()); + } + + public QuerySetResult() { + } + + public List<QueryResult> getQueryResults() { + return queryResults; + } @SuppressWarnings("unused") public void setQueryResults(List<QueryResult> queryResults) { - this.queryResults = queryResults; - } + this.queryResults = queryResults; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java index 4ccdd2b..158ed11 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java @@ -18,10 +18,10 @@ package org.apache.phoenix.pherf.result; -import java.util.List; - import org.apache.phoenix.pherf.result.file.ResultFileDetails; +import java.util.List; + /** * Common container for Pherf results. */ @@ -33,10 +33,9 @@ public class Result { private final String header; /** - * - * @param type {@link org.apache.phoenix.pherf.result.file.ResultFileDetails} Currently unused, but gives metadata about the - * contents of the result. - * @param header Used for CSV, otherwise pass null. For CSV pass comma separated string of header fields. + * @param type {@link org.apache.phoenix.pherf.result.file.ResultFileDetails} Currently unused, but gives metadata about the + * contents of the result. + * @param header Used for CSV, otherwise pass null. For CSV pass comma separated string of header fields. * @param messageValues List<{@link ResultValue} All fields combined represent the data * for a row to be written. */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/18070ccc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java index f650cbb..5b71300 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java @@ -29,9 +29,14 @@ import java.util.List; */ public interface ResultHandler { public void write(Result result) throws Exception; + public void flush() throws Exception; + public void close() throws Exception; + public List<Result> read() throws Exception; + public boolean isClosed(); + public ResultFileDetails getResultFileDetails(); }