Repository: phoenix Updated Branches: refs/heads/4.4-HBase-1.0 b72ce4ce3 -> ac7dc675e
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ac7dc675/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java index 78f18ca..c9333a0 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java @@ -43,153 +43,160 @@ import difflib.DiffUtils; import difflib.Patch; public class QueryVerifier { - private PhoenixUtil pUtil = new PhoenixUtil(); - private static final Logger logger = LoggerFactory - .getLogger(QueryVerifier.class); - private boolean useTemporaryOutput; - private String directoryLocation; - - public QueryVerifier(boolean useTemporaryOutput) { - this.useTemporaryOutput = useTemporaryOutput; - this.directoryLocation = this.useTemporaryOutput ? - PherfConstants.EXPORT_TMP : PherfConstants.EXPORT_DIR; - - ensureBaseDirExists(); - } - - /*** - * Export query resultSet to CSV file - * @param query - * @throws Exception - */ - public String exportCSV(Query query) throws Exception { - Connection conn = null; - PreparedStatement statement = null; - ResultSet rs = null; - String fileName = getFileName(query); - FileOutputStream fos = new FileOutputStream(fileName); - try { - conn = pUtil.getConnection(query.getTenantId()); - statement = conn.prepareStatement(query.getStatement()); - boolean isQuery = statement.execute(); - if (isQuery) { - rs = statement.executeQuery(); - int columnCount = rs.getMetaData().getColumnCount(); - while (rs.next()) { - for (int columnNum = 1; columnNum <= columnCount; columnNum++) { - fos.write((rs.getString(columnNum) + PherfConstants.RESULT_FILE_DELIMETER).getBytes()); - } - fos.write(PherfConstants.NEW_LINE.getBytes()); - } - } else { - conn.commit(); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (rs != null) rs.close(); - if (statement != null) statement.close(); - if (conn != null) conn.close(); - fos.flush(); - fos.close(); - } - return fileName; - } - - /*** - * Do a diff between exported query results and temporary CSV file - * @param query - * @param newCSV - * @return - */ - public boolean doDiff(Query query, String newCSV) { + private PhoenixUtil pUtil = PhoenixUtil.create(); + private static final Logger logger = LoggerFactory.getLogger(QueryVerifier.class); + private boolean useTemporaryOutput; + private String directoryLocation; + + public QueryVerifier(boolean useTemporaryOutput) { + this.useTemporaryOutput = useTemporaryOutput; + this.directoryLocation = + this.useTemporaryOutput ? PherfConstants.EXPORT_TMP : PherfConstants.EXPORT_DIR; + + ensureBaseDirExists(); + } + + /** + * Export query resultSet to CSV file + * + * @param query + * @throws Exception + */ + public String exportCSV(Query query) throws Exception { + Connection conn = null; + PreparedStatement statement = null; + ResultSet rs = null; + String fileName = getFileName(query); + FileOutputStream fos = new FileOutputStream(fileName); + try { + conn = pUtil.getConnection(query.getTenantId()); + statement = conn.prepareStatement(query.getStatement()); + boolean isQuery = statement.execute(); + if (isQuery) { + rs = statement.executeQuery(); + int columnCount = rs.getMetaData().getColumnCount(); + while (rs.next()) { + for (int columnNum = 1; columnNum <= columnCount; columnNum++) { + fos.write((rs.getString(columnNum) + PherfConstants.RESULT_FILE_DELIMETER) + .getBytes()); + } + fos.write(PherfConstants.NEW_LINE.getBytes()); + } + } else { + conn.commit(); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (rs != null) rs.close(); + if (statement != null) statement.close(); + if (conn != null) conn.close(); + fos.flush(); + fos.close(); + } + return fileName; + } + + /** + * Do a diff between exported query results and temporary CSV file + * + * @param query + * @param newCSV + * @return + */ + public boolean doDiff(Query query, String newCSV) { List<String> original = fileToLines(getCSVName(query, PherfConstants.EXPORT_DIR, "")); - List<String> newLines = fileToLines(newCSV); - + List<String> newLines = fileToLines(newCSV); + Patch patch = DiffUtils.diff(original, newLines); if (patch.getDeltas().isEmpty()) { - logger.info("Match: " + query.getId() + " with " + newCSV); - return true; + logger.info("Match: " + query.getId() + " with " + newCSV); + return true; } else { - logger.error("DIFF FAILED: " + query.getId() + " with " + newCSV); - return false; + logger.error("DIFF FAILED: " + query.getId() + " with " + newCSV); + return false; } - } - - /*** - * Helper method to load file - * @param filename - * @return - */ + } + + /** + * Helper method to load file + * + * @param filename + * @return + */ private static List<String> fileToLines(String filename) { - List<String> lines = new LinkedList<String>(); - String line = ""; - try { - BufferedReader in = new BufferedReader(new FileReader(filename)); - while ((line = in.readLine()) != null) { - lines.add(line); - } - in.close(); - } catch (IOException e) { - e.printStackTrace(); + List<String> lines = new LinkedList<String>(); + String line = ""; + try { + BufferedReader in = new BufferedReader(new FileReader(filename)); + while ((line = in.readLine()) != null) { + lines.add(line); } - - return lines; + in.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + return lines; } /** * Get explain plan for a query + * * @param query * @return * @throws SQLException */ - public String getExplainPlan(Query query) throws SQLException { - Connection conn = null; - ResultSet rs = null; - PreparedStatement statement = null; - StringBuilder buf = new StringBuilder(); - try { - conn = pUtil.getConnection(query.getTenantId()); - statement = conn.prepareStatement("EXPLAIN " + query.getStatement()); - rs = statement.executeQuery(); - while (rs.next()) { - buf.append(rs.getString(1).trim().replace(",", "-")); - } - statement.close(); - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (rs != null) rs.close(); - if (statement != null) statement.close(); - if (conn != null) conn.close(); - } - return buf.toString(); - } - - /*** + public String getExplainPlan(Query query) throws SQLException { + Connection conn = null; + ResultSet rs = null; + PreparedStatement statement = null; + StringBuilder buf = new StringBuilder(); + try { + conn = pUtil.getConnection(query.getTenantId()); + statement = conn.prepareStatement("EXPLAIN " + query.getStatement()); + rs = statement.executeQuery(); + while (rs.next()) { + buf.append(rs.getString(1).trim().replace(",", "-")); + } + statement.close(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (rs != null) rs.close(); + if (statement != null) statement.close(); + if (conn != null) conn.close(); + } + return buf.toString(); + } + + /** * Helper method to generate CSV file name + * * @param query * @return * @throws FileNotFoundException */ - private String getFileName(Query query) throws FileNotFoundException { - String tempExt = ""; - if (this.useTemporaryOutput) { - tempExt = "_" + java.util.UUID.randomUUID().toString(); - } - return getCSVName(query, this.directoryLocation, tempExt); - } - - private String getCSVName(Query query, String directory, String tempExt) { - String csvFile = directory + PherfConstants.PATH_SEPARATOR - + query.getId() + tempExt + Extension.CSV.toString(); - return csvFile; - } - - private void ensureBaseDirExists() { - File baseDir = new File(this.directoryLocation); - if (!baseDir.exists()) { - baseDir.mkdir(); - } - } + private String getFileName(Query query) throws FileNotFoundException { + String tempExt = ""; + if (this.useTemporaryOutput) { + tempExt = "_" + java.util.UUID.randomUUID().toString(); + } + return getCSVName(query, this.directoryLocation, tempExt); + } + + private String getCSVName(Query query, String directory, String tempExt) { + String + csvFile = + directory + PherfConstants.PATH_SEPARATOR + query.getId() + tempExt + Extension.CSV + .toString(); + return csvFile; + } + + private void ensureBaseDirExists() { + File baseDir = new File(this.directoryLocation); + if (!baseDir.exists()) { + baseDir.mkdir(); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ac7dc675/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java new file mode 100644 index 0000000..16a493e --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java @@ -0,0 +1,10 @@ +package org.apache.phoenix.pherf.workload; + +public interface Workload { + public Runnable execute() throws Exception; + + /** + * Use this method to perform any cleanup or forced shutdown of the thread. + */ + public void complete(); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/ac7dc675/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java index cf2f038..a65b4aa 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java @@ -19,95 +19,96 @@ package org.apache.phoenix.pherf.workload; import org.apache.phoenix.pherf.PherfConstants; -import org.apache.phoenix.pherf.PherfConstants.RunMode; -import org.apache.phoenix.pherf.configuration.XMLConfigParser; -import org.apache.phoenix.pherf.jmx.MonitorManager; -import org.apache.phoenix.pherf.loaddata.DataLoader; - -import org.apache.phoenix.pherf.util.ResourceList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; public class WorkloadExecutor { private static final Logger logger = LoggerFactory.getLogger(WorkloadExecutor.class); - private final XMLConfigParser parser; - private MonitorManager monitor; - private Future monitorThread; private final int poolSize; - private final ExecutorService pool; + // Jobs can be accessed by multiple threads + private final Map<Workload, Future> jobs = new ConcurrentHashMap<>(); + private final ExecutorService pool; public WorkloadExecutor() throws Exception { this(PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES)); } - public WorkloadExecutor(Properties properties) throws Exception{ - this(properties,PherfConstants.DEFAULT_FILE_PATTERN); + public WorkloadExecutor(Properties properties) throws Exception { + this(properties, new ArrayList()); } - public WorkloadExecutor(Properties properties, String filePattern) throws Exception { - this(properties, - new XMLConfigParser(filePattern), - true); + public WorkloadExecutor(Properties properties, List<Workload> workloads) throws Exception { + this.poolSize = + (properties.getProperty("pherf.default.threadpool") == null) ? + PherfConstants.DEFAULT_THREAD_POOL_SIZE : + Integer.parseInt(properties.getProperty("pherf.default.threadpool")); + + this.pool = Executors.newFixedThreadPool(this.poolSize); + init(workloads); } - public WorkloadExecutor(Properties properties, XMLConfigParser parser, boolean monitor) throws Exception { - this.parser = parser; - this.poolSize = (properties.getProperty("pherf.default.threadpool") == null) - ? PherfConstants.DEFAULT_THREAD_POOL_SIZE - : Integer.parseInt(properties.getProperty("pherf.default.threadpool")); + public void add(Workload workload) throws Exception { + this.jobs.put(workload, pool.submit(workload.execute())); + } - this.pool = Executors.newFixedThreadPool(this.poolSize); - if (monitor) { - initMonitor(Integer.parseInt(properties.getProperty("pherf.default.monitorFrequency"))); + /** + * Blocks on waiting for all workloads to finish. If a + * {@link org.apache.phoenix.pherf.workload.Workload} Requires complete() to be called, it must + * be called prior to using this method. Otherwise it will block infinitely. + */ + public void get() { + for (Workload workload : jobs.keySet()) { + get(workload); } } /** - * Executes all scenarios dataload + * Calls the {@link java.util.concurrent.Future#get()} method pertaining to this workflow. + * Once the Future competes, the workflow is removed from the list. * - * @throws Exception + * @param workload Key entry in the HashMap */ - public void executeDataLoad() throws Exception { - logger.info("\n\nStarting Data Loader..."); - DataLoader dataLoader = new DataLoader(parser); - dataLoader.execute(); + public void get(Workload workload) { + try { + Future future = jobs.get(workload); + future.get(); + jobs.remove(workload); + } catch (InterruptedException | ExecutionException e) { + logger.error("", e); + } } /** - * Executes all scenario multi-threaded query sets - * - * @param queryHint - * @throws Exception + * Complete all workloads in the list. + * Entries in the job Map will persist until {#link WorkloadExecutorNew#get()} is called */ - public void executeMultithreadedQueryExecutor(String queryHint, boolean export, RunMode runMode) throws Exception { - logger.info("\n\nStarting Query Executor..."); - QueryExecutor queryExecutor = new QueryExecutor(parser); - queryExecutor.execute(queryHint, export, runMode); + public void complete() { + for (Workload workload : jobs.keySet()) { + workload.complete(); + } } - public void shutdown() throws Exception { - if (null != monitor && monitor.isRunning()) { - this.monitor.stop(); - this.monitorThread.get(60, TimeUnit.SECONDS); - this.pool.shutdown(); - } + public void shutdown() { + // Make sure any Workloads still on pool have been properly shutdown + complete(); + pool.shutdownNow(); } - // Just used for testing - public XMLConfigParser getParser() { - return parser; + public ExecutorService getPool() { + return pool; } - private void initMonitor(int monitorFrequency) throws Exception { - this.monitor = new MonitorManager(monitorFrequency); - monitorThread = pool.submit(this.monitor); + private void init(List<Workload> workloads) throws Exception { + for (Workload workload : workloads) { + this.jobs.put(workload, pool.submit(workload.execute())); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ac7dc675/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java new file mode 100644 index 0000000..305521b --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.workload; + +import org.apache.phoenix.pherf.PherfConstants; +import org.apache.phoenix.pherf.configuration.Column; +import org.apache.phoenix.pherf.configuration.Scenario; +import org.apache.phoenix.pherf.configuration.WriteParams; +import org.apache.phoenix.pherf.configuration.XMLConfigParser; +import org.apache.phoenix.pherf.exception.PherfException; +import org.apache.phoenix.pherf.result.DataLoadThreadTime; +import org.apache.phoenix.pherf.result.DataLoadTimeSummary; +import org.apache.phoenix.pherf.result.ResultUtil; +import org.apache.phoenix.pherf.rules.DataValue; +import org.apache.phoenix.pherf.rules.RulesApplier; +import org.apache.phoenix.pherf.util.PhoenixUtil; +import org.apache.phoenix.pherf.util.RowCalculator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class WriteWorkload implements Workload { + private static final Logger logger = LoggerFactory.getLogger(WriteWorkload.class); + private final PhoenixUtil pUtil; + private final XMLConfigParser parser; + private final RulesApplier rulesApplier; + private final ResultUtil resultUtil; + private final ExecutorService pool; + private final WriteParams writeParams; + private final Scenario scenario; + private final long threadSleepDuration; + + private final int threadPoolSize; + private final int batchSize; + + public WriteWorkload(XMLConfigParser parser) throws Exception { + this(PhoenixUtil.create(), parser); + } + + public WriteWorkload(PhoenixUtil util, XMLConfigParser parser) throws Exception { + this(util, parser, null); + } + + public WriteWorkload(PhoenixUtil phoenixUtil, XMLConfigParser parser, Scenario scenario) + throws Exception { + this(phoenixUtil, PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES), + parser, scenario); + } + + /** + * Default the writers to use up all available cores for threads. If writeParams are used in + * the config files, they will override the defaults. writeParams are used for read/write mixed + * workloads. + * TODO extract notion of the scenario list and have 1 write workload per scenario + * + * @param phoenixUtil {@link org.apache.phoenix.pherf.util.PhoenixUtil} Query helper + * @param properties {@link java.util.Properties} default properties to use + * @param parser {@link org.apache.phoenix.pherf.configuration.XMLConfigParser} + * @param scenario {@link org.apache.phoenix.pherf.configuration.Scenario} If null is passed + * it will run against all scenarios in the parsers list. + * @throws Exception + */ + public WriteWorkload(PhoenixUtil phoenixUtil, Properties properties, XMLConfigParser parser, + Scenario scenario) throws Exception { + this.pUtil = phoenixUtil; + this.parser = parser; + this.rulesApplier = new RulesApplier(parser); + this.resultUtil = new ResultUtil(); + + // Overwrite defaults properties with those given in the configuration. This indicates the + // scenario is a R/W mixed workload. + if (scenario != null) { + this.scenario = scenario; + writeParams = scenario.getWriteParams(); + threadSleepDuration = writeParams.getThreadSleepDuration(); + } else { + writeParams = null; + this.scenario = null; + threadSleepDuration = 0; + } + + int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool")); + + this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size; + + // TODO Move pool management up to WorkloadExecutor + this.pool = Executors.newFixedThreadPool(this.threadPoolSize); + + String + bSize = + (writeParams == null) || (writeParams.getBatchSize() == Long.MIN_VALUE) ? + properties.getProperty("pherf.default.dataloader.batchsize") : + String.valueOf(writeParams.getBatchSize()); + this.batchSize = + (bSize == null) ? PherfConstants.DEFAULT_BATCH_SIZE : Integer.parseInt(bSize); + } + + @Override public void complete() { + } + + public Runnable execute() throws Exception { + return new Runnable() { + @Override public void run() { + try { + DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary(); + DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime(); + + if (WriteWorkload.this.scenario == null) { + for (Scenario scenario : getParser().getScenarios()) { + exec(dataLoadTimeSummary, dataLoadThreadTime, scenario); + } + } else { + exec(dataLoadTimeSummary, dataLoadThreadTime, WriteWorkload.this.scenario); + } + resultUtil.write(dataLoadTimeSummary); + resultUtil.write(dataLoadThreadTime); + + } catch (Exception e) { + logger.warn("", e); + } + } + }; + } + + private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary, + DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception { + logger.info("\nLoading " + scenario.getRowCount() + " rows for " + scenario.getTableName()); + long start = System.currentTimeMillis(); + + List<Future> writeBatches = getBatches(dataLoadThreadTime, scenario); + + waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches); + + // always update stats for Phoenix base tables + updatePhoenixStats(scenario.getTableName()); + } + + private List<Future> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario) + throws Exception { + RowCalculator + rowCalculator = + new RowCalculator(getThreadPoolSize(), scenario.getRowCount()); + List<Future> writeBatches = new ArrayList<>(); + + for (int i = 0; i < getThreadPoolSize(); i++) { + List<Column> + phxMetaCols = + pUtil.getColumnsFromPhoenix(scenario.getSchemaName(), + scenario.getTableNameWithoutSchemaName(), pUtil.getConnection()); + int threadRowCount = rowCalculator.getNext(); + logger.info( + "Kick off thread (#" + i + ")for upsert with (" + threadRowCount + ") rows."); + Future<Info> + write = + upsertData(scenario, phxMetaCols, scenario.getTableName(), threadRowCount, + dataLoadThreadTime); + writeBatches.add(write); + } + if (writeBatches.isEmpty()) { + throw new PherfException( + "Holy shit snacks! Throwing up hands in disbelief and exiting. Could not write data for some unknown reason."); + } + + return writeBatches; + } + + private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario scenario, + long start, List<Future> writeBatches) + throws InterruptedException, java.util.concurrent.ExecutionException { + int sumRows = 0, sumDuration = 0; + // Wait for all the batch threads to complete + for (Future<Info> write : writeBatches) { + Info writeInfo = write.get(); + sumRows += writeInfo.getRowCount(); + sumDuration += writeInfo.getDuration(); + logger.info("Executor (" + this.hashCode() + ") writes complete with row count (" + + writeInfo.getRowCount() + ") in Ms (" + writeInfo.getDuration() + ")"); + } + logger.info("Writes completed with total row count (" + sumRows + ") with total time of(" + + sumDuration + ") Ms"); + dataLoadTimeSummary + .add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start)); + } + + /** + * TODO Move this method to PhoenixUtil + * Update Phoenix table stats + * + * @param tableName + * @throws Exception + */ + public void updatePhoenixStats(String tableName) throws Exception { + logger.info("Updating stats for " + tableName); + pUtil.executeStatement("UPDATE STATISTICS " + tableName); + } + + public Future<Info> upsertData(final Scenario scenario, final List<Column> columns, + final String tableName, final int rowCount, + final DataLoadThreadTime dataLoadThreadTime) { + Future<Info> future = pool.submit(new Callable<Info>() { + @Override public Info call() throws Exception { + int rowsCreated = 0; + long start = 0, duration, totalDuration; + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Connection connection = null; + try { + connection = pUtil.getConnection(); + long logStartTime = System.currentTimeMillis(); + long + maxDuration = + (WriteWorkload.this.writeParams == null) ? + Long.MAX_VALUE : + WriteWorkload.this.writeParams.getExecutionDurationInMs(); + + for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime) + < maxDuration); i--) { + String sql = buildSql(columns, tableName); + PreparedStatement stmt = connection.prepareStatement(sql); + stmt = buildStatement(scenario, columns, stmt, simpleDateFormat); + start = System.currentTimeMillis(); + rowsCreated += stmt.executeUpdate(); + stmt.close(); + if ((i % getBatchSize()) == 0) { + connection.commit(); + duration = System.currentTimeMillis() - start; + logger.info("Writer (" + Thread.currentThread().getName() + + ") committed Batch. Total " + getBatchSize() + + " rows for this thread (" + this.hashCode() + ") in (" + + duration + ") Ms"); + + if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) { + dataLoadThreadTime + .add(tableName, Thread.currentThread().getName(), i, + System.currentTimeMillis() - logStartTime); + logStartTime = System.currentTimeMillis(); + } + + // Pause for throttling if configured to do so + Thread.sleep(threadSleepDuration); + } + } + } finally { + if (connection != null) { + try { + connection.commit(); + duration = System.currentTimeMillis() - start; + logger.info("Writer ( " + Thread.currentThread().getName() + + ") committed Final Batch. Duration (" + duration + ") Ms"); + connection.close(); + } catch (SQLException e) { + // Swallow since we are closing anyway + e.printStackTrace(); + } + } + } + totalDuration = System.currentTimeMillis() - start; + return new Info(totalDuration, rowsCreated); + } + }); + return future; + } + + private PreparedStatement buildStatement(Scenario scenario, List<Column> columns, + PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception { + int count = 1; + for (Column column : columns) { + + DataValue dataValue = getRulesApplier().getDataForRule(scenario, column); + switch (column.getType()) { + case VARCHAR: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.VARCHAR); + } else { + statement.setString(count, dataValue.getValue()); + } + break; + case CHAR: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.CHAR); + } else { + statement.setString(count, dataValue.getValue()); + } + break; + case DECIMAL: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.DECIMAL); + } else { + statement.setBigDecimal(count, new BigDecimal(dataValue.getValue())); + } + break; + case INTEGER: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.INTEGER); + } else { + statement.setInt(count, Integer.parseInt(dataValue.getValue())); + } + break; + case DATE: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.DATE); + } else { + Date + date = + new java.sql.Date( + simpleDateFormat.parse(dataValue.getValue()).getTime()); + statement.setDate(count, date); + } + break; + default: + break; + } + count++; + } + return statement; + } + + private String buildSql(final List<Column> columns, final String tableName) { + StringBuilder builder = new StringBuilder(); + builder.append("upsert into "); + builder.append(tableName); + builder.append(" ("); + int count = 1; + for (Column column : columns) { + builder.append(column.getName()); + if (count < columns.size()) { + builder.append(","); + } else { + builder.append(")"); + } + count++; + } + builder.append(" VALUES ("); + for (int i = 0; i < columns.size(); i++) { + if (i < columns.size() - 1) { + builder.append("?,"); + } else { + builder.append("?)"); + } + } + return builder.toString(); + } + + public XMLConfigParser getParser() { + return parser; + } + + public RulesApplier getRulesApplier() { + return rulesApplier; + } + + public int getBatchSize() { + return batchSize; + } + + public int getThreadPoolSize() { + return threadPoolSize; + } + + private class Info { + + private final int rowCount; + private final long duration; + + public Info(long duration, int rows) { + this.duration = duration; + this.rowCount = rows; + } + + public long getDuration() { + return duration; + } + + public int getRowCount() { + return rowCount; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/ac7dc675/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml index 9514089..8f93685 100644 --- a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml +++ b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml @@ -304,6 +304,41 @@ </column> </datamapping> <scenarios> + <scenario tableName="PHERF.PHERF_PROD_TEST_UNSALTED" rowCount="100" name="readWriteScenario"> + <!-- Scenario level rule overrides will be unsupported in V1. + You can use the general datamappings in the mean time--> + <dataOverride> + <column> + <type>VARCHAR</type> + <userDefined>true</userDefined> + <dataSequence>LIST</dataSequence> + <name>TENANT_ID</name> + </column> + </dataOverride> + <writeParams executionDurationInMs="10000"> + <!-- + Number of writer it insert into the threadpool + --> + <writerThreadCount>5</writerThreadCount> + + <!-- + Time in Ms that each thread will sleep between batch writes. This helps to + throttle writers. + --> + <threadSleepDuration>10</threadSleepDuration> + + <batchSize>100</batchSize> + </writeParams> + <!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first --> + <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="60000" + numberOfExecutions="100"> + <!-- Aggregate queries on a per tenant basis --> + <query tenantId="00Dxx0000001gER" + ddl="CREATE VIEW IF NOT EXISTS PHERF.PHERF_TEST_VIEW_UNSALTED AS SELECT * FROM PHERF.PHERF_PROD_TEST_UNSALTED" + statement="select count(*) from PHERF.PHERF_TEST_VIEW_UNSALTED"/> + </querySet> + + </scenario> <scenario tableName="PHERF.PHERF_PROD_TEST_UNSALTED" rowCount="10"> <!-- Scenario level rule overrides will be unsupported in V1. You can use the general datamappings in the mean time--> http://git-wip-us.apache.org/repos/asf/phoenix/blob/ac7dc675/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java index f362842..6f25fbd 100644 --- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java +++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java @@ -18,6 +18,7 @@ package org.apache.phoenix.pherf; +import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; @@ -27,7 +28,6 @@ import java.util.List; import org.apache.phoenix.pherf.configuration.*; import org.apache.phoenix.pherf.rules.DataValue; -import org.apache.phoenix.pherf.workload.WorkloadExecutor; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,53 +38,55 @@ import javax.xml.bind.Marshaller; import static org.junit.Assert.*; -public class ConfigurationParserTest extends ResultBaseTest{ +public class ConfigurationParserTest extends ResultBaseTest { private static final Logger logger = LoggerFactory.getLogger(ConfigurationParserTest.class); @Test - public void testConfigFilesParsing() { - try { - WorkloadExecutor workloadExec = new WorkloadExecutor(); - List<Scenario> scenarioList = workloadExec.getParser().getScenarios(); - assertTrue("Could not load the scenarios from xml.", (scenarioList != null) && (scenarioList.size() > 0)); - logger.info("Number of scenarios loaded: " + scenarioList.size()); - - } catch (Exception e) { - e.printStackTrace(); - fail(); + public void testReadWriteWorkloadReader() throws Exception { + String scenarioName = "testScenarioRW"; + List<Scenario> scenarioList = getScenarios(); + Scenario target = null; + for (Scenario scenario : scenarioList) { + if (scenarioName.equals(scenario.getName())) { + target = scenario; + } } + assertNotNull("Could not find scenario: " + scenarioName, target); + WriteParams params = target.getWriteParams(); + + assertNotNull("Could not find writeParams in scenario: " + scenarioName, params); + assertNotNull("Could not find batch size: ", params.getBatchSize()); + assertNotNull("Could not find execution duration: ", params.getExecutionDurationInMs()); + assertNotNull("Could not find sleep duration: ", params.getThreadSleepDuration()); + assertNotNull("Could not find writer count: ", params.getWriterThreadCount()); } - @Test + @Test // TODO Break this into multiple smaller tests. - public void testConfigReader(){ - URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml"); - assertNotNull("Test data XML file is missing", resourceUrl); - - try { + public void testConfigReader() { + try { logger.debug("DataModel: " + writeXML()); - Path resourcePath = Paths.get(resourceUrl.toURI()); - DataModel data = XMLConfigParser.readDataModel(resourcePath); - List<Scenario> scenarioList = data.getScenarios(); - assertTrue("Could not load the scenarios from xml.", (scenarioList != null) && (scenarioList.size() > 0)); - List<Column> dataMappingColumns = data.getDataMappingColumns(); - assertTrue("Could not load the data columns from xml.", (dataMappingColumns != null) && (dataMappingColumns.size() > 0)); + List<Scenario> scenarioList = getScenarios(); + List<Column> dataMappingColumns = getDataModel().getDataMappingColumns(); + assertTrue("Could not load the data columns from xml.", + (dataMappingColumns != null) && (dataMappingColumns.size() > 0)); assertTrue("Could not load the data DataValue list from xml.", (dataMappingColumns.get(6).getDataValues() != null) - && (dataMappingColumns.get(6).getDataValues().size() > 0)); + && (dataMappingColumns.get(6).getDataValues().size() > 0)); assertDateValue(dataMappingColumns); // Validate column mappings for (Column column : dataMappingColumns) { - assertNotNull("Column ("+ column.getName() + ") is missing its type",column.getType()); + assertNotNull("Column (" + column.getName() + ") is missing its type", + column.getType()); } - Scenario scenario = scenarioList.get(0); + Scenario scenario = scenarioList.get(1); assertNotNull(scenario); assertEquals("PHERF.TEST_TABLE", scenario.getTableName()); - assertEquals(10, scenario.getRowCount()); + assertEquals(30, scenario.getRowCount()); assertEquals(1, scenario.getDataOverride().getColumn().size()); QuerySet qs = scenario.getQuerySet().get(0); assertEquals(ExecutionType.SERIAL, qs.getExecutionType()); @@ -99,27 +101,50 @@ public class ConfigurationParserTest extends ResultBaseTest{ assertEquals("select count(*) from PHERF.TEST_TABLE", firstQuery.getStatement()); assertEquals("123456789012345", firstQuery.getTenantId()); assertEquals(null, firstQuery.getDdl()); - assertEquals(0, (long)firstQuery.getExpectedAggregateRowCount()); + assertEquals(0, (long) firstQuery.getExpectedAggregateRowCount()); Query secondQuery = qs.getQuery().get(1); - assertEquals("Could not get statement.", "select sum(SOME_INT) from PHERF.TEST_TABLE", secondQuery.getStatement()); + assertEquals("Could not get statement.", "select sum(SOME_INT) from PHERF.TEST_TABLE", + secondQuery.getStatement()); assertEquals("Could not get queryGroup.", "g1", secondQuery.getQueryGroup()); // Make sure anything in the overrides matches a real column in the data mappings DataOverride override = scenario.getDataOverride(); for (Column column : override.getColumn()) { - assertTrue("Could not lookup Column (" + column.getName() + ") in DataMapping columns: " + dataMappingColumns, dataMappingColumns.contains(column)); + assertTrue("Could not lookup Column (" + column.getName() + + ") in DataMapping columns: " + dataMappingColumns, + dataMappingColumns.contains(column)); } - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + private URL getResourceUrl() { + URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml"); + assertNotNull("Test data XML file is missing", resourceUrl); + return resourceUrl; + } + + private List<Scenario> getScenarios() throws URISyntaxException, JAXBException{ + DataModel data = getDataModel(); + List<Scenario> scenarioList = data.getScenarios(); + assertTrue("Could not load the scenarios from xml.", + (scenarioList != null) && (scenarioList.size() > 0)); + return scenarioList; + } + + private DataModel getDataModel() throws URISyntaxException, JAXBException { + Path resourcePath = Paths.get(getResourceUrl().toURI()); + return XMLConfigParser.readDataModel(resourcePath); + } private void assertDateValue(List<Column> dataMappingColumns) { for (Column dataMapping : dataMappingColumns) { - if ((dataMapping.getType() == DataTypeMapping.DATE) && (dataMapping.getName().equals("CREATED_DATE"))) { + if ((dataMapping.getType() == DataTypeMapping.DATE) && (dataMapping.getName() + .equals("CREATED_DATE"))) { // First rule should have min/max set assertNotNull(dataMapping.getDataValues().get(0).getMinValue()); assertNotNull(dataMapping.getDataValues().get(0).getMaxValue()); @@ -139,7 +164,7 @@ public class ConfigurationParserTest extends ResultBaseTest{ /* Used for debugging to dump out a simple xml filed based on the bound objects. */ - private String writeXML() { + private String writeXML() { DataModel data = new DataModel(); try { DataValue dataValue = new DataValue(); @@ -156,7 +181,6 @@ public class ConfigurationParserTest extends ResultBaseTest{ List<Column> columnList = new ArrayList<>(); columnList.add(column); - data.setRelease("192"); data.setDataMappingColumns(columnList); Scenario scenario = new Scenario(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ac7dc675/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java index 4ab36fb..4ccf95c 100644 --- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java +++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java @@ -33,7 +33,6 @@ import org.apache.phoenix.pherf.result.file.ResultFileDetails; import org.apache.phoenix.pherf.result.impl.CSVResultHandler; import org.apache.phoenix.pherf.result.impl.XMLResultHandler; import org.apache.phoenix.pherf.result.*; -import org.junit.BeforeClass; import org.junit.Test; import org.apache.phoenix.pherf.configuration.Query; @@ -56,7 +55,7 @@ public class ResultTest extends ResultBaseTest { resultMonitorWriter.write(result); resultMonitorWriter.write(result); resultMonitorWriter.write(result); - resultMonitorWriter.flush(); + resultMonitorWriter.close(); List<Result> results = resultMonitorWriter.read(); assertEquals("Results did not contain row.", results.size(), 3); @@ -72,8 +71,8 @@ public class ResultTest extends ResultBaseTest { public void testMonitorResult() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); MonitorManager monitor = new MonitorManager(100); - Future future = executorService.submit(monitor); - List<Result> records = null; + Future future = executorService.submit(monitor.execute()); + List<Result> records; final int TIMEOUT = 30; int ct = 0; @@ -83,7 +82,7 @@ public class ResultTest extends ResultBaseTest { Thread.sleep(100); if (ct == max) { int timer = 0; - monitor.stop(); + monitor.complete(); while (monitor.isRunning() && (timer < TIMEOUT)) { System.out.println("Waiting for monitor to finish. Seconds Waited :" + timer); Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ac7dc675/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java index 15d4608..92604d4 100644 --- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java +++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java @@ -19,7 +19,7 @@ package org.apache.phoenix.pherf; import org.apache.phoenix.pherf.configuration.*; -import org.apache.phoenix.pherf.loaddata.DataLoader; +import org.apache.phoenix.pherf.workload.WriteWorkload; import org.apache.phoenix.pherf.rules.DataValue; import org.apache.phoenix.pherf.rules.RulesApplier; import org.apache.phoenix.pherf.util.PhoenixUtil; @@ -28,20 +28,19 @@ import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.junit.Test; -import java.sql.Types; import java.util.*; import static org.junit.Assert.*; public class RuleGeneratorTest { - static PhoenixUtil util = new PhoenixUtil(true); - static final String matcherScenario = PherfConstants.SCENARIO_ROOT_PATTERN + ".xml"; + private static PhoenixUtil util = PhoenixUtil.create(true); + private static final String matcherScenario = PherfConstants.SCENARIO_ROOT_PATTERN + ".xml"; @Test public void testDateGenerator() throws Exception { XMLConfigParser parser = new XMLConfigParser(matcherScenario); DataModel model = parser.getDataModels().get(0); - DataLoader loader = new DataLoader(parser); + WriteWorkload loader = new WriteWorkload(parser); RulesApplier rulesApplier = loader.getRulesApplier(); for (Column dataMapping : model.getDataMappingColumns()) { @@ -68,7 +67,7 @@ public class RuleGeneratorTest { public void testNullChance() throws Exception { XMLConfigParser parser = new XMLConfigParser(matcherScenario); DataModel model = parser.getDataModels().get(0); - DataLoader loader = new DataLoader(parser); + WriteWorkload loader = new WriteWorkload(parser); RulesApplier rulesApplier = loader.getRulesApplier(); int sampleSize = 100; List<String> values = new ArrayList<>(sampleSize); @@ -96,7 +95,7 @@ public class RuleGeneratorTest { public void testSequentialDataSequence() throws Exception { XMLConfigParser parser = new XMLConfigParser(matcherScenario); DataModel model = parser.getDataModels().get(0); - DataLoader loader = new DataLoader(parser); + WriteWorkload loader = new WriteWorkload(parser); RulesApplier rulesApplier = loader.getRulesApplier(); Column targetColumn = null; @@ -181,7 +180,7 @@ public class RuleGeneratorTest { expectedValues.add("cCCyYhnNbBs9kWr"); XMLConfigParser parser = new XMLConfigParser(".*test_scenario.xml"); - DataLoader loader = new DataLoader(parser); + WriteWorkload loader = new WriteWorkload(parser); RulesApplier rulesApplier = loader.getRulesApplier(); Scenario scenario = parser.getScenarios().get(0); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ac7dc675/phoenix-pherf/src/test/resources/scenario/test_scenario.xml ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml index 45d36d2..fddf022 100644 --- a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml +++ b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml @@ -127,10 +127,50 @@ <name>NEWVAL_STRING</name> <prefix>TSTPRFX</prefix> </column> - </datamapping> <scenarios> - <scenario tableName="PHERF.TEST_TABLE" rowCount="10" name="testScenario"> + <scenario tableName="PHERF.TEST_TABLE" rowCount="100" name="testScenarioRW"> + <!-- Scenario level rule overrides will be unsupported in V1. + You can use the general datamappings in the mean time--> + <dataOverride> + <column> + <type>VARCHAR</type> + <userDefined>true</userDefined> + <dataSequence>RANDOM</dataSequence> + <length>10</length> + <name>FIELD</name> + </column> + </dataOverride> + + <!-- + This is used to add mixed R/W workloads. + + If this tag exists, a writer pool will be created based on the below properties. + These props will override the default values in pherf.properties, but only for this + scenario.The write jobs will run in conjunction with the querySet below. + --> + <writeParams executionDurationInMs="10000"> + <!-- + Number of writer it insert into the threadpool + --> + <writerThreadCount>2</writerThreadCount> + + <!-- + Time in Ms that each thread will sleep between batch writes. This helps to + throttle writers. + --> + <threadSleepDuration>10</threadSleepDuration> + + <batchSize>1000</batchSize> + </writeParams> + <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="10000"> + <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/> + <query id="q4" statement="select sum(DIVISION) from PHERF.TEST_TABLE"/> + </querySet> + + </scenario> + + <scenario tableName="PHERF.TEST_TABLE" rowCount="30" name="testScenario"> <!-- Scenario level rule overrides will be unsupported in V1. You can use the general datamappings in the mean time--> <dataOverride> @@ -145,16 +185,20 @@ <!--Note: 1. Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first 2. DDL included in query are executed only once on start of querySet execution. --> - <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000" numberOfExecutions="100"> - <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0" statement="select count(*) from PHERF.TEST_TABLE"/> + <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000" + numberOfExecutions="100"> + <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0" + statement="select count(*) from PHERF.TEST_TABLE"/> <!-- queryGroup is a way to organize queries across tables or scenario files. The value will be dumped to results. This gives a value to group by on reporting to compare queries --> - <query id="q2" queryGroup="g1" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/> + <query id="q2" queryGroup="g1" + statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/> </querySet> <!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first --> - <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000" numberOfExecutions="10"> + <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000" + numberOfExecutions="10"> <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/> - <query id="q4" statement="select sum(DIVISION) from PHERF.TEST_TABLE"/> + <query id="q4" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/> </querySet> </scenario> </scenarios> http://git-wip-us.apache.org/repos/asf/phoenix/blob/ac7dc675/phoenix-pherf/standalone/pherf.sh ---------------------------------------------------------------------- diff --git a/phoenix-pherf/standalone/pherf.sh b/phoenix-pherf/standalone/pherf.sh index e08035a..2b91d2c 100755 --- a/phoenix-pherf/standalone/pherf.sh +++ b/phoenix-pherf/standalone/pherf.sh @@ -24,5 +24,5 @@ for f in $PHERF_HOME/lib/*.jar; do CLASSPATH=${CLASSPATH}:$f; done -CMD="time $}JAVA_HOME}/bin/java ${REMOTE_DEBUG} -Dapp.home=${PHERF_HOME} ${ENV_PROPS} -Xms512m -Xmx3072m -cp ${CLASSPATH} org.apache.phoenix.pherf.Pherf ${@}" +CMD="time ${JAVA_HOME}/bin/java ${REMOTE_DEBUG} -Dapp.home=${PHERF_HOME} ${ENV_PROPS} -Xms512m -Xmx3072m -cp ${CLASSPATH} org.apache.phoenix.pherf.Pherf ${@}" eval $CMD \ No newline at end of file