This is an automated email from the ASF dual-hosted git repository. chinmayskulkarni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new dccc260 PHOENIX-4521: Allow Pherf scenario to define per query max allowed query execution duration after which thread is interrupted dccc260 is described below commit dccc260413591a7ab3133f8040b8547b8e993750 Author: Christine Feng <chfen...@gmail.com> AuthorDate: Mon Mar 16 19:32:27 2020 -0700 PHOENIX-4521: Allow Pherf scenario to define per query max allowed query execution duration after which thread is interrupted Signed-off-by: Chinmay Kulkarni <chinmayskulka...@apache.org> --- phoenix-pherf/pom.xml | 5 + .../java/org/apache/phoenix/pherf/PherfMainIT.java | 88 +++++++++++++ .../apache/phoenix/pherf/configuration/Query.java | 11 ++ .../apache/phoenix/pherf/result/QueryResult.java | 2 + .../org/apache/phoenix/pherf/result/RunTime.java | 34 +++-- .../apache/phoenix/pherf/result/ThreadTime.java | 5 +- .../apache/phoenix/pherf/result/file/Header.java | 4 +- .../pherf/workload/MultiThreadedRunner.java | 85 +++++++++---- .../pherf/workload/MultithreadedDiffer.java | 4 +- .../java/org/apache/phoenix/pherf/ResultTest.java | 10 +- .../pherf/workload/MultiThreadedRunnerTest.java | 121 ++++++++++++++++++ .../resources/datamodel/timeout_test_schema.sql | 22 ++++ .../resources/scenario/timeout_test_scenario.xml | 138 +++++++++++++++++++++ 13 files changed, 483 insertions(+), 46 deletions(-) diff --git a/phoenix-pherf/pom.xml b/phoenix-pherf/pom.xml index 9b5914e..a156318 100644 --- a/phoenix-pherf/pom.xml +++ b/phoenix-pherf/pom.xml @@ -178,6 +178,11 @@ <artifactId>hamcrest-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> <!-- Java 11 --> <dependency> <groupId>javax.activation</groupId> diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java index 3ee9327..be9b27a 100644 --- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java +++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java @@ -18,14 +18,35 @@ package org.apache.phoenix.pherf; +import org.apache.commons.lang3.StringUtils; +import org.apache.phoenix.pherf.result.Result; +import org.apache.phoenix.pherf.result.ResultValue; +import org.apache.phoenix.pherf.result.file.ResultFileDetails; +import org.apache.phoenix.pherf.result.impl.CSVFileResultHandler; import org.junit.Rule; import org.junit.Test; import org.junit.contrib.java.lang.system.ExpectedSystemExit; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.Future; +import static org.junit.Assert.assertEquals; + public class PherfMainIT extends ResultBaseTestIT { + public HashMap<String, String> mapResults(Result r) throws IOException { + HashMap<String, String> map = new HashMap<>(); + List<ResultValue> resultValues = r.getResultValues(); + String[] headerValues = r.getHeader().split(PherfConstants.RESULT_FILE_DELIMETER); + for (int i = 0; i < headerValues.length; i++) { + map.put(StringUtils.strip(headerValues[i],"[] "), + StringUtils.strip(resultValues.get(i).toString(), "[] ")); + } + return map; + } + @Rule public final ExpectedSystemExit exit = ExpectedSystemExit.none(); @@ -43,4 +64,71 @@ public class PherfMainIT extends ResultBaseTestIT { future.get(); } } + + @Test + public void testQueryTimeout() throws Exception { + // Timeout of 0 ms means every query execution should time out + String[] args = {"-q", "-l", + "-drop", "all", + "-schemaFile", ".*timeout_test_schema.sql", + "-scenarioFile", ".*timeout_test_scenario.xml" }; + Pherf p = new Pherf(args); + p.run(); + + CSVFileResultHandler rh = new CSVFileResultHandler(); + rh.setResultFileDetails(ResultFileDetails.CSV_DETAILED_PERFORMANCE); + rh.setResultFileName("COMBINED"); + List<Result> resultList = rh.read(); + for (Result r : resultList) { + HashMap<String, String> resultsMap = mapResults(r); + if (resultsMap.get("QUERY_ID").equals("q1")) { + assertEquals(resultsMap.get("TIMED_OUT"), "true"); + } + } + } + + @Test + public void testLargeQueryTimeout() throws Exception { + // Timeout of max_long ms means every query execution should finish without timing out + String[] args = {"-q", "-l", + "-drop", "all", + "-schemaFile", ".*timeout_test_schema.sql", + "-scenarioFile", ".*timeout_test_scenario.xml" }; + Pherf p = new Pherf(args); + p.run(); + + CSVFileResultHandler rh = new CSVFileResultHandler(); + rh.setResultFileDetails(ResultFileDetails.CSV_DETAILED_PERFORMANCE); + rh.setResultFileName("COMBINED"); + List<Result> resultList = rh.read(); + for (Result r : resultList) { + HashMap<String, String> resultsMap = mapResults(r); + if (resultsMap.get("QUERY_ID").equals("q2")) { + assertEquals(resultsMap.get("TIMED_OUT"), "false"); + } + } + } + + @Test + public void testNoQueryTimeout() throws Exception { + // Missing timeout attribute means every query execution should finish without timing out + String[] args = {"-q", "-l", + "-drop", "all", + "-schemaFile", ".*timeout_test_schema.sql", + "-scenarioFile", ".*timeout_test_scenario.xml" }; + Pherf p = new Pherf(args); + p.run(); + + CSVFileResultHandler rh = new CSVFileResultHandler(); + rh.setResultFileDetails(ResultFileDetails.CSV_DETAILED_PERFORMANCE); + rh.setResultFileName("COMBINED"); + List<Result> resultList = rh.read(); + for (Result r : resultList) { + HashMap<String, String> resultsMap = mapResults(r); + if (resultsMap.get("QUERY_ID").equals("q3")) { + assertEquals(resultsMap.get("TIMED_OUT"), "false"); + } + } + } + } \ No newline at end of file diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java index e283715..5f28134 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java @@ -36,6 +36,7 @@ public class Query { private String queryGroup; private String id; private Pattern pattern; + private long timeoutDuration = Long.MAX_VALUE; public Query() { pattern = Pattern.compile("\\[.*?\\]"); @@ -158,4 +159,14 @@ public class Query { public void setId(String id) { this.id = id; } + + + @XmlAttribute + public long getTimeoutDuration() { + return this.timeoutDuration; + } + + public void setTimeoutDuration(long timeoutDuration) { + this.timeoutDuration = timeoutDuration; + } } diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java index cef24f4..228d003 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java @@ -54,6 +54,7 @@ public class QueryResult extends Query { this.setDdl(query.getDdl()); this.setQueryGroup(query.getQueryGroup()); this.setId(query.getId()); + this.setTimeoutDuration(query.getTimeoutDuration()); } public Date getStartTime() { @@ -150,6 +151,7 @@ public class QueryResult extends Query { List<ResultValue> rowValues = new ArrayList<>(); rowValues.add(new ResultValue(util.convertNull(getStartTimeText()))); rowValues.add(new ResultValue(util.convertNull(this.getQueryGroup()))); + rowValues.add(new ResultValue(util.convertNull(this.getId()))); rowValues.add(new ResultValue(util.convertNull(this.getStatement()))); rowValues.add(new ResultValue(util.convertNull(this.getTenantId()))); rowValues.addAll(runTime); diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java index 3aa45fa..59bd265 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java @@ -24,38 +24,44 @@ import java.util.Date; public class RunTime implements Comparator<RunTime>, Comparable<RunTime> { private Date startTime; - private Integer elapsedDurationInMs; + private Long elapsedDurationInMs; private String message; private Long resultRowCount; private String explainPlan; + private boolean timedOut; @SuppressWarnings("unused") public RunTime() { } - @SuppressWarnings("unused") public RunTime(Integer elapsedDurationInMs) { + @SuppressWarnings("unused") public RunTime(Long elapsedDurationInMs) { this(null, elapsedDurationInMs); } - public RunTime(Long resultRowCount, Integer elapsedDurationInMs) { + public RunTime(Long resultRowCount, Long elapsedDurationInMs) { this(null, resultRowCount, elapsedDurationInMs); } - public RunTime(Date startTime, Long resultRowCount, Integer elapsedDurationInMs) { - this(null, null, startTime, resultRowCount, elapsedDurationInMs); + public RunTime(Date startTime, Long resultRowCount, Long elapsedDurationInMs) { + this(null, null, startTime, resultRowCount, elapsedDurationInMs, false); + } + + public RunTime(Date startTime, Long elapsedDurationInMs, boolean timedOut) { + this(null, startTime, null, elapsedDurationInMs, timedOut); } public RunTime(String message, Date startTime, Long resultRowCount, - Integer elapsedDurationInMs) { - this(message, null, startTime, resultRowCount, elapsedDurationInMs); + Long elapsedDurationInMs, boolean timedOut) { + this(message, null, startTime, resultRowCount, elapsedDurationInMs, timedOut); } public RunTime(String message, String explainPlan, Date startTime, Long resultRowCount, - Integer elapsedDurationInMs) { + Long elapsedDurationInMs, boolean timedOut) { this.elapsedDurationInMs = elapsedDurationInMs; this.startTime = startTime; this.resultRowCount = resultRowCount; this.message = message; this.explainPlan = explainPlan; + this.timedOut = timedOut; } @XmlAttribute() public Date getStartTime() { @@ -66,11 +72,11 @@ public class RunTime implements Comparator<RunTime>, Comparable<RunTime> { this.startTime = startTime; } - @XmlAttribute() public Integer getElapsedDurationInMs() { + @XmlAttribute() public Long getElapsedDurationInMs() { return elapsedDurationInMs; } - @SuppressWarnings("unused") public void setElapsedDurationInMs(Integer elapsedDurationInMs) { + @SuppressWarnings("unused") public void setElapsedDurationInMs(Long elapsedDurationInMs) { this.elapsedDurationInMs = elapsedDurationInMs; } @@ -105,4 +111,12 @@ public class RunTime implements Comparator<RunTime>, Comparable<RunTime> { @SuppressWarnings("unused") public void setResultRowCount(Long resultRowCount) { this.resultRowCount = resultRowCount; } + + @SuppressWarnings("unused") public void setTimedOut(boolean timedOut) { + this.timedOut = timedOut; + } + + @XmlAttribute() public boolean getTimedOut() { + return this.timedOut; + } } \ No newline at end of file diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java index 03b5664..e1e7652 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java @@ -69,13 +69,13 @@ public class ThreadTime { public Integer getAvgTimeInMs() { if (getRunTimesInMs().isEmpty()) return null; - Integer totalTimeInMs = new Integer(0); + Long totalTimeInMs = new Long(0); for (RunTime runTime : getRunTimesInMs()) { if (null != runTime.getElapsedDurationInMs()) { totalTimeInMs += runTime.getElapsedDurationInMs(); } } - return totalTimeInMs / getRunTimesInMs().size(); + return (int) (totalTimeInMs / getRunTimesInMs().size()); } public RunTime getMaxTimeInMs() { @@ -117,6 +117,7 @@ public class ThreadTime { rowValues.add(new ResultValue( util.convertNull(getRunTimesInMs().get(i).getMessage()))); } + rowValues.add(new ResultValue(getRunTimesInMs().get(i).getTimedOut())); rows.add(rowValues); } return rows; diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java index 7d09f68..c888199 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java @@ -24,8 +24,8 @@ public enum Header { "START_TIME,QUERY_GROUP,QUERY,TENANT_ID,AVG_MAX_TIME_MS,AVG_TIME_MS,AVG_MIN_TIME_MS,RUN_COUNT,EXPLAIN_PLAN,RESULT_ROW_COUNT"), DETAILED_BASE( "BASE_TABLE_NAME,SCENARIO_NAME,ZOOKEEPER,ROW_COUNT,EXECUTION_COUNT,EXECUTION_TYPE,PHOENIX_PROPERTIES" - + ",START_TIME,QUERY_GROUP,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"), - DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS"), + + ",START_TIME,QUERY_GROUP,QUERY_ID,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"), + DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS,TIMED_OUT"), DETAILED_FUNCTIONAL(DETAILED_BASE + ",DIFF_STATUS,EXPLAIN_PLAN"), AGGREGATE_DATA_LOAD("ZK,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"), THIN_AGGREGATE_DATA_LOAD("QUERYSERVER,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"), diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java index 9fcc38e..41e6045 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java @@ -25,6 +25,8 @@ import java.util.Calendar; import java.util.Date; import java.util.concurrent.Callable; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.pherf.result.DataModelResult; import org.apache.phoenix.pherf.result.ResultManager; import org.apache.phoenix.pherf.result.RunTime; @@ -88,13 +90,21 @@ class MultiThreadedRunner implements Callable<Void> { */ @Override public Void call() throws Exception { - LOGGER.info("\n\nThread Starting " + threadName + " ; " + query.getStatement() + " for " - + numberOfExecutions + "times\n\n"); - Long start = EnvironmentEdgeManager.currentTimeMillis(); - for (long i = numberOfExecutions; (i > 0 && ((EnvironmentEdgeManager.currentTimeMillis() - start) - < executionDurationInMs)); i--) { + LOGGER.info("\n\nThread Starting " + threadName + " ; '" + query.getStatement() + "' for " + + numberOfExecutions + " times\n\n"); + long threadStartTime = EnvironmentEdgeManager.currentTimeMillis(); + for (long i = 0; i < numberOfExecutions; i++) { + long threadElapsedTime = EnvironmentEdgeManager.currentTimeMillis() - threadStartTime; + if (threadElapsedTime >= executionDurationInMs) { + LOGGER.info("Queryset timeout of " + executionDurationInMs + " ms reached; current time is " + threadElapsedTime + " ms." + + "\nStopping queryset execution for query " + query.getId() + " on thread " + threadName + "..."); + break; + } + synchronized (workloadExecutor) { - timedQuery(); + if (!timedQuery(i+1)) { + break; + } if ((EnvironmentEdgeManager.currentTimeMillis() - lastResultWritten) > 1000) { resultManager.write(dataModelResult, ruleApplier); lastResultWritten = EnvironmentEdgeManager.currentTimeMillis(); @@ -119,8 +129,9 @@ class MultiThreadedRunner implements Callable<Void> { * Timed query execution * * @throws Exception + * @returns boolean true if query finished without timing out; false otherwise */ - private void timedQuery() throws Exception { + private boolean timedQuery(long iterationNumber) throws Exception { boolean isSelectCountStatement = query.getStatement().toUpperCase().trim().contains("COUNT(") ? true : false; @@ -128,17 +139,19 @@ class MultiThreadedRunner implements Callable<Void> { Connection conn = null; PreparedStatement statement = null; ResultSet rs = null; - Long start = EnvironmentEdgeManager.currentTimeMillis(); + Long queryStartTime = EnvironmentEdgeManager.currentTimeMillis(); Date startDate = Calendar.getInstance().getTime(); String exception = null; - long resultRowCount = 0; + Long resultRowCount = 0L; + String queryIteration = threadName + ":" + iterationNumber; + Long queryElapsedTime = 0L; try { conn = pUtil.getConnection(query.getTenantId(), scenario.getPhoenixProperties()); conn.setAutoCommit(true); final String statementString = query.getDynamicStatement(ruleApplier, scenario); statement = conn.prepareStatement(statementString); - LOGGER.info("Executing: " + statementString); + LOGGER.info("Executing iteration: " + queryIteration + ": " + statementString); if (scenario.getWriteParams() != null) { Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, scenario, GeneratePhoenixStats.NO); @@ -148,34 +161,54 @@ class MultiThreadedRunner implements Callable<Void> { boolean isQuery = statement.execute(); if (isQuery) { rs = statement.getResultSet(); - while (rs.next()) { - if (null != query.getExpectedAggregateRowCount()) { - if (rs.getLong(1) != query.getExpectedAggregateRowCount()) - throw new RuntimeException( - "Aggregate count " + rs.getLong(1) + " does not match expected " - + query.getExpectedAggregateRowCount()); - } - - if (isSelectCountStatement) { - resultRowCount = rs.getLong(1); - } else { - resultRowCount++; - } - } + Pair<Long, Long> r = getResults(rs, queryIteration, isSelectCountStatement, queryStartTime); + resultRowCount = r.getFirst(); + queryElapsedTime = r.getSecond(); } else { conn.commit(); } } catch (Exception e) { - LOGGER.error("Exception while executing query", e); + LOGGER.error("Exception while executing query iteration " + queryIteration, e); exception = e.getMessage(); throw e; } finally { getThreadTime().getRunTimesInMs().add(new RunTime(exception, startDate, resultRowCount, - (int) (EnvironmentEdgeManager.currentTimeMillis() - start))); + queryElapsedTime, queryElapsedTime > query.getTimeoutDuration())); if (rs != null) rs.close(); if (statement != null) statement.close(); if (conn != null) conn.close(); } + return true; } + + @VisibleForTesting + /** + * @return a Pair whose first value is the resultRowCount, and whose second value is whether the query timed out. + */ + Pair<Long, Long> getResults(ResultSet rs, String queryIteration, boolean isSelectCountStatement, Long queryStartTime) throws Exception { + Long resultRowCount = 0L; + while (rs.next()) { + if (null != query.getExpectedAggregateRowCount()) { + if (rs.getLong(1) != query.getExpectedAggregateRowCount()) + throw new RuntimeException( + "Aggregate count " + rs.getLong(1) + " does not match expected " + + query.getExpectedAggregateRowCount()); + } + + if (isSelectCountStatement) { + resultRowCount = rs.getLong(1); + } else { + resultRowCount++; + } + long queryElapsedTime = EnvironmentEdgeManager.currentTimeMillis() - queryStartTime; + if (queryElapsedTime >= query.getTimeoutDuration()) { + LOGGER.error("Query " + queryIteration + " exceeded timeout of " + + query.getTimeoutDuration() + " ms at " + queryElapsedTime + " ms."); + return new Pair(resultRowCount, queryElapsedTime); + } + } + return new Pair(resultRowCount, EnvironmentEdgeManager.currentTimeMillis() - queryStartTime); + } + } \ No newline at end of file diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java index e3480dd..6f43ee4 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java @@ -22,6 +22,7 @@ import java.util.Calendar; import java.util.Date; import java.util.concurrent.Callable; +import com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.pherf.PherfConstants; import org.apache.phoenix.pherf.configuration.Query; import org.apache.phoenix.pherf.result.RunTime; @@ -56,10 +57,11 @@ class MultithreadedDiffer implements Callable<Void> { String newCSV = queryVerifier.exportCSV(query); boolean verifyResult = queryVerifier.doDiff(query, newCSV); String explainPlan = pUtil.getExplainPlan(query); + long elapsedTime = EnvironmentEdgeManager.currentTimeMillis() - start; getThreadTime().getRunTimesInMs().add(new RunTime( verifyResult == true ? PherfConstants.DIFF_PASS : PherfConstants.DIFF_FAIL, explainPlan, startDate, -1L, - (int) (EnvironmentEdgeManager.currentTimeMillis() - start))); + elapsedTime, !(elapsedTime >= executionDurationInMs))); } /** diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java index 28e8c62..9cad1f1 100644 --- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java +++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java @@ -137,8 +137,8 @@ public class ResultTest extends ResultBaseTest { ThreadTime ttFromFile = queryResultFromFile.getThreadTimes().get(0); // thread level verification - assertEquals(10, (int) ttFromFile.getMinTimeInMs().getElapsedDurationInMs()); - assertEquals(30, (int) ttFromFile.getMaxTimeInMs().getElapsedDurationInMs()); + assertEquals(new Long(10), ttFromFile.getMinTimeInMs().getElapsedDurationInMs()); + assertEquals(new Long(30), ttFromFile.getMaxTimeInMs().getElapsedDurationInMs()); assertEquals(20, (int) ttFromFile.getAvgTimeInMs()); // 3rd runtime has the earliest start time, therefore that's what's expected. @@ -190,13 +190,13 @@ public class ResultTest extends ResultBaseTest { tt.setThreadName("thread1"); Calendar calendar = Calendar.getInstance(); Date startTime1 = calendar.getTime(); - RunTime runtime1 = new RunTime(startTime1, 1000L, 10); + RunTime runtime1 = new RunTime(startTime1, 1000L, new Long(10)); tt.getRunTimesInMs().add(runtime1); calendar.add(Calendar.MINUTE, -1); - RunTime runtime2 = new RunTime(calendar.getTime(), 2000L, 20); + RunTime runtime2 = new RunTime(calendar.getTime(), 2000L, new Long(20)); tt.getRunTimesInMs().add(runtime2); calendar.add(Calendar.MINUTE, -1); - RunTime runtime3 = new RunTime(calendar.getTime(), 3000L, 30); + RunTime runtime3 = new RunTime(calendar.getTime(), 3000L, new Long(30)); tt.getRunTimesInMs().add(runtime3); queryResult.getThreadTimes().add(tt); queryResult2.getThreadTimes().add(tt); diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/MultiThreadedRunnerTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/MultiThreadedRunnerTest.java new file mode 100644 index 0000000..d9c7ca3 --- /dev/null +++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/MultiThreadedRunnerTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.workload; + +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.pherf.configuration.Query; +import org.apache.phoenix.pherf.configuration.Scenario; +import org.apache.phoenix.pherf.configuration.XMLConfigParser; +import org.apache.phoenix.pherf.result.DataModelResult; +import org.apache.phoenix.pherf.result.ThreadTime; +import org.apache.phoenix.pherf.rules.RulesApplier; +import org.apache.phoenix.util.DefaultEnvironmentEdge; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.sql.ResultSet; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class MultiThreadedRunnerTest { + @Mock + private static XMLConfigParser mockParser; + @Mock + private static DataModelResult mockDMR; + @Mock + private static RulesApplier mockRA; + @Mock + private static ThreadTime mockTT; + @Mock + private static Scenario mockScenario; + @Mock + private static WorkloadExecutor mockWE; + @Mock + private static Query mockQuery; + @Mock + private static ResultSet mockRS; + + @Before + public void init() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testExpectedRowsMismatch() throws Exception { + Mockito.when(mockQuery.getExpectedAggregateRowCount()).thenReturn(1L); + MultiThreadedRunner mtr = new MultiThreadedRunner("test", + mockQuery, mockDMR, mockTT, + 10L, 1000L, + true, mockRA, + mockScenario, mockWE, mockParser); + Mockito.when(mockRS.next()).thenReturn(true); + Mockito.when(mockRS.getLong(1)).thenReturn(2L); + try { + mtr.getResults(mockRS, "test_iteration", false,0L); + fail(); + } catch (RuntimeException e) { + //pass; + } + + } + + @Test + public void testTimeout() throws Exception { + Mockito.when(mockQuery.getTimeoutDuration()).thenReturn(1000L); + Mockito.when(mockQuery.getExpectedAggregateRowCount()).thenReturn(1L); + MultiThreadedRunner mtr = new MultiThreadedRunner("test", + mockQuery, mockDMR, mockTT, + 10L, 1000L, + true, mockRA, + mockScenario, mockWE, mockParser); + DefaultEnvironmentEdge myClock = Mockito.mock(DefaultEnvironmentEdge.class); + Mockito.when(myClock.currentTime()).thenReturn(0L, 5000L); + EnvironmentEdgeManager.injectEdge(myClock); + Mockito.when(mockRS.next()).thenReturn(true); + Mockito.when(mockRS.getLong(1)).thenReturn(1L); + Pair<Long, Long> results = mtr.getResults(mockRS, "test_iteration", false,0L); + assertTrue(results.getSecond()>mockQuery.getTimeoutDuration()); + } + + @Test + public void testFinishWithoutTimeout() throws Exception { + DefaultEnvironmentEdge myClock = Mockito.mock(DefaultEnvironmentEdge.class); + Mockito.when(myClock.currentTime()).thenReturn(0L); + EnvironmentEdgeManager.injectEdge(myClock); + Mockito.when(mockQuery.getTimeoutDuration()).thenReturn(1000L); + Mockito.when(mockQuery.getExpectedAggregateRowCount()).thenReturn(1L); + MultiThreadedRunner mtr = new MultiThreadedRunner("test", + mockQuery, mockDMR, mockTT, + 10L, 1000L, + true, mockRA, + mockScenario, mockWE, mockParser); + Mockito.when(mockRS.next()).thenReturn(true, false); + Mockito.when(mockRS.getLong(1)).thenReturn(1L); + Pair<Long, Long> results = mtr.getResults(mockRS, "test_iteration", false, 0L); + assertFalse(results.getSecond() > mockQuery.getTimeoutDuration()); + } + +} diff --git a/phoenix-pherf/src/test/resources/datamodel/timeout_test_schema.sql b/phoenix-pherf/src/test/resources/datamodel/timeout_test_schema.sql new file mode 100644 index 0000000..e753ddf --- /dev/null +++ b/phoenix-pherf/src/test/resources/datamodel/timeout_test_schema.sql @@ -0,0 +1,22 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + CREATE TABLE IF NOT EXISTS PHERF.USER_DEFINED_TEST ( + TENANT_ID VARCHAR NOT NULL PRIMARY KEY, + CREATED_DATE DATE, + VAL_STRING VARCHAR +) \ No newline at end of file diff --git a/phoenix-pherf/src/test/resources/scenario/timeout_test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/timeout_test_scenario.xml new file mode 100644 index 0000000..0fe2463 --- /dev/null +++ b/phoenix-pherf/src/test/resources/scenario/timeout_test_scenario.xml @@ -0,0 +1,138 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<datamodel release="192" name="test_scenario"> + <datamapping> + <column> + <!-- This column type defines what will generally happen to VARCHAR fields unless they are explicitly defined or overridden elsewhere --> + <type>VARCHAR</type> + <dataSequence>SEQUENTIAL</dataSequence> + <length>15</length> + <name>GENERAL_VARCHAR</name> + </column> + <column> + <type>CHAR</type> + <dataSequence>SEQUENTIAL</dataSequence> + <length>15</length> + <name>GENERAL_CHAR</name> + </column> + <column> + <type>DATE</type> + <!--SEQUENTIAL is unsupported for DATE --> + <dataSequence>RANDOM</dataSequence> + <!-- Number [0-100] that represents the probability of creating a null value --> + <!-- The higher the number, the more like the value will returned will be null --> + <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null --> + <nullChance>0</nullChance> + <minValue>1975</minValue> + <maxValue>2025</maxValue> + <name>GENERAL_DATE</name> + </column> + <column> + <type>DECIMAL</type> + <dataSequence>RANDOM</dataSequence> + <minValue>0</minValue> + <maxValue>1</maxValue> + + <!-- Precision is limited to 18 --> + <precision>18</precision> + <!-- Number [0-100] that represents the probability of creating a null value --> + <!-- The higher the number, the more like the value will returned will be null --> + <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null --> + <nullChance>10</nullChance> + <name>GENERAL_DECIMAL</name> + </column> + <column> + <type>INTEGER</type> + <dataSequence>RANDOM</dataSequence> + <minValue>1</minValue> + <maxValue>50000000</maxValue> + <!-- Number [0-100] that represents the probability of creating a null value --> + <!-- The higher the number, the more like the value will returned will be null --> + <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null --> + <name>GENERAL_INTEGER</name> + </column> + <column> + <type>DATE</type> + <name>CREATED_DATE</name> + <minValue>1975</minValue> + <maxValue>2025</maxValue> + <valuelist> + <!-- Distributes randomly with equal chance of being picked --> + <datavalue distribution="80"> + <!-- Joda time format: yyyy-MM-dd HH:mm:ss.SSS ZZZ --> + <minValue>2019-09-15 00:01:00.000</minValue> + <maxValue>2019-09-15 11:00:00.000</maxValue> + </datavalue> + <datavalue distribution="10"> + <value>2019-09-19 00:01:00.000</value> + </datavalue> + <datavalue distribution="10"> + <minValue>2019-09-22 00:01:00.000</minValue> + <maxValue>2019-09-22 00:01:00.300</maxValue> + </datavalue> + </valuelist> + </column> + <column> + <type>CHAR</type> + <userDefined>true</userDefined> + <dataSequence>LIST</dataSequence> + <length>15</length> + <name>VAL_STRING</name> + <valuelist> + <!-- Distributes randomly with equal chance of being picked --> + <datavalue distribution="50"> + <value>KjhoOmnNbBs9kWs</value> + </datavalue> + <datavalue distribution="50"> + <value>VAL123</value> + </datavalue> + </valuelist> + </column> + </datamapping> + <scenarios> + <scenario tableName="PHERF.USER_DEFINED_TEST" rowCount="100000" name="myscenario"> + <!-- Scenario level rule overrides will be unsupported in V1. + You can use the general datamappings in the mean time--> + <dataOverride> + <column> + <type>VARCHAR</type> + <userDefined>true</userDefined> + <dataSequence>RANDOM</dataSequence> + <length>10</length> + <name>DO_NOT_USE</name> + </column> + </dataOverride> + <!--Note: 1. Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first + 2. DDL included in query are executed only once on start of querySet execution. + --> + <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000" numberOfExecutions="10"> + <!-- queryGroup is a way to organize queries across tables or scenario files. + The value will be dumped to results. This gives a value to group by on reporting to compare queries --> + <query id="q1" expectedAggregateRowCount="100000" timeoutDuration="0" + statement="SELECT COUNT(*) FROM PHERF.USER_DEFINED_TEST"/> + <query id="q2" expectedAggregateRowCount="100000" timeoutDuration="9223372036854775807" + statement="SELECT COUNT(*) FROM PHERF.USER_DEFINED_TEST"/> + <query id="q3" expectedAggregateRowCount="100000" + statement="SELECT COUNT(*) FROM PHERF.USER_DEFINED_TEST"/> + + </querySet> + </scenario> + </scenarios> +</datamodel>