This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0171f99ee1 [multistage] restructure runner test (#9489)
0171f99ee1 is described below
commit 0171f99ee1feaeb11bd84e002fc4797991a07d39
Author: Rong Rong <[email protected]>
AuthorDate: Thu Sep 29 19:16:44 2022 -0700
[multistage] restructure runner test (#9489)
* fix dispatcher/server shutdown
* fix lint
Co-authored-by: Rong Rong <[email protected]>
---
.../QueryRunnerTest.java => QueryTestSet.java} | 146 +----------------
.../pinot/query/runtime/QueryRunnerTest.java | 176 +--------------------
.../pinot/query/runtime/QueryRunnerTestBase.java | 3 +-
.../pinot/query/service/QueryDispatcherTest.java | 18 +--
.../pinot/query/service/QueryServerTest.java | 23 +--
5 files changed, 19 insertions(+), 347 deletions(-)
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java
similarity index 66%
copy from
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
copy to
pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java
index 872eec4313..31d1670861 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java
@@ -16,136 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import org.apache.pinot.core.transport.ServerInstance;
-import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.query.service.QueryDispatcher;
-import org.testng.Assert;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-
-public class QueryRunnerTest extends QueryRunnerTestBase {
-
- @Test(dataProvider = "testDataWithSqlToFinalRowCount")
- public void testSqlWithFinalRowCountChecker(String sql, int expectedRows)
- throws Exception {
- List<Object[]> resultRows = queryRunner(sql);
- Assert.assertEquals(resultRows.size(), expectedRows);
- }
-
- @Test(dataProvider = "testDataWithSql")
- public void testSqlWithH2Checker(String sql)
- throws Exception {
- List<Object[]> resultRows = queryRunner(sql);
- // query H2 for data
- List<Object[]> expectedRows = queryH2(sql);
- compareRowEquals(resultRows, expectedRows);
- }
+package org.apache.pinot.query;
- private List<Object[]> queryRunner(String sql) {
- QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
- Map<String, String> requestMetadataMap =
- ImmutableMap.of("REQUEST_ID",
String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
- MailboxReceiveOperator mailboxReceiveOperator = null;
- for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
- if (queryPlan.getQueryStageMap().get(stageId) instanceof
MailboxReceiveNode) {
- MailboxReceiveNode reduceNode = (MailboxReceiveNode)
queryPlan.getQueryStageMap().get(stageId);
- mailboxReceiveOperator =
QueryDispatcher.createReduceStageOperator(_mailboxService,
-
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
- Long.parseLong(requestMetadataMap.get("REQUEST_ID")),
reduceNode.getSenderStageId(),
- reduceNode.getDataSchema(), "localhost", _reducerGrpcPort);
- } else {
- for (ServerInstance serverInstance :
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
- DistributedStagePlan distributedStagePlan =
- QueryDispatcher.constructDistributedStagePlan(queryPlan,
stageId, serverInstance);
- _servers.get(serverInstance).processQuery(distributedStagePlan,
requestMetadataMap);
- }
- }
- }
- Preconditions.checkNotNull(mailboxReceiveOperator);
- return
QueryDispatcher.toResultTable(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator),
- queryPlan.getQueryResultFields()).getRows();
- }
+import org.testng.annotations.DataProvider;
- private List<Object[]> queryH2(String sql)
- throws Exception {
- Statement h2statement =
_h2Connection.createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
- h2statement.execute(sql);
- ResultSet h2ResultSet = h2statement.getResultSet();
- int columnCount = h2ResultSet.getMetaData().getColumnCount();
- List<Object[]> result = new ArrayList<>();
- while (h2ResultSet.next()) {
- Object[] row = new Object[columnCount];
- for (int i = 0; i < columnCount; i++) {
- row[i] = h2ResultSet.getObject(i + 1);
- }
- result.add(row);
- }
- return result;
- }
- private void compareRowEquals(List<Object[]> resultRows, List<Object[]>
expectedRows) {
- Assert.assertEquals(resultRows.size(), expectedRows.size());
-
- Comparator<Object> valueComp = (l, r) -> {
- if (l == null && r == null) {
- return 0;
- } else if (l == null) {
- return -1;
- } else if (r == null) {
- return 1;
- }
- if (l instanceof Integer) {
- return Integer.compare((Integer) l, ((Number) r).intValue());
- } else if (l instanceof Long) {
- return Long.compare((Long) l, ((Number) r).longValue());
- } else if (l instanceof Float) {
- return Float.compare((Float) l, ((Number) r).floatValue());
- } else if (l instanceof Double) {
- return Double.compare((Double) l, ((Number) r).doubleValue());
- } else if (l instanceof String) {
- return ((String) l).compareTo((String) r);
- } else {
- throw new RuntimeException("non supported type");
- }
- };
- Comparator<Object[]> rowComp = (l, r) -> {
- int cmp = 0;
- for (int i = 0; i < l.length; i++) {
- cmp = valueComp.compare(l[i], r[i]);
- if (cmp != 0) {
- return cmp;
- }
- }
- return 0;
- };
- resultRows.sort(rowComp);
- expectedRows.sort(rowComp);
- for (int i = 0; i < resultRows.size(); i++) {
- Object[] resultRow = resultRows.get(i);
- Object[] expectedRow = expectedRows.get(i);
- for (int j = 0; j < resultRow.length; j++) {
- Assert.assertEquals(valueComp.compare(resultRow[j], expectedRow[j]), 0,
- "Not match at (" + i + "," + j + ")! Expected: " + expectedRow[j]
+ " Actual: " + resultRow[j]);
- }
- }
- }
+public class QueryTestSet {
- @DataProvider(name = "testDataWithSql")
- private Object[][] provideTestSql() {
+ @DataProvider(name = "testSql")
+ public Object[][] provideTestSql() {
return new Object[][]{
// Order BY LIMIT
new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3"},
@@ -317,19 +196,4 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
new Object[]{"SELECT col1, COUNT(col3) FROM a GROUP BY col1 HAVING
SUM(col3) > 40 AND SUM(col3) < 30"},
};
}
-
- @DataProvider(name = "testDataWithSqlToFinalRowCount")
- private Object[][] provideTestSqlAndRowCount() {
- return new Object[][] {
- // using join clause
- new Object[]{"SELECT * FROM a JOIN b USING (col1)", 15},
-
- // test dateTrunc
- // - on leaf stage
- new Object[]{"SELECT dateTrunc('DAY', ts) FROM a LIMIT 10", 15},
- // - on intermediate stage
- new Object[]{"SELECT dateTrunc('DAY', round(a.ts, b.ts)) FROM a JOIN b
"
- + "ON a.col1 = b.col1 AND a.col2 = b.col2", 15},
- };
- }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 872eec4313..e1f685a363 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -46,7 +46,7 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
Assert.assertEquals(resultRows.size(), expectedRows);
}
- @Test(dataProvider = "testDataWithSql")
+ @Test(dataProvider = "testSql")
public void testSqlWithH2Checker(String sql)
throws Exception {
List<Object[]> resultRows = queryRunner(sql);
@@ -144,180 +144,6 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
}
}
- @DataProvider(name = "testDataWithSql")
- private Object[][] provideTestSql() {
- return new Object[][]{
- // Order BY LIMIT
- new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3"},
- new Object[]{"SELECT * FROM a ORDER BY col1, ts LIMIT 10"},
- new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 20"},
-
- // No match filter
- new Object[]{"SELECT * FROM b WHERE col3 < 0.5"},
-
- // Hybrid table
- new Object[]{"SELECT * FROM d"},
-
- // Specifically table A has 15 rows (10 on server1 and 5 on server2)
and table B has 5 rows (all on server1),
- // thus the final JOIN result will be 15 x 1 = 15.
- // Next join with table C which has (5 on server1 and 10 on server2),
since data is identical. each of the row
- // of the A JOIN B will have identical value of col3 as table C.col3
has. Since the values are cycling between
- // (1, 42, 1, 42, 1). we will have 9 1s, and 6 42s, total result count
will be 9 * 9 + 6 * 6 = 117
- new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON
a.col3 = c.col3"},
- // Reverse the order of join condition and join table order.
- new Object[]{"SELECT * FROM a JOIN b ON b.col1 = a.col1 JOIN c ON
c.col3 = a.col3"},
-
- // Specifically table A has 15 rows (10 on server1 and 5 on server2)
and table B has 5 rows (all on server1),
- // thus the final JOIN result will be 15 x 1 = 15.
- new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1"},
-
- // Query with function in JOIN keys, table A and B are both (1, 42, 1,
42, 1), with table A cycling 3 times.
- // Because:
- // - MOD(a.col3, 2) will have 6 (42)s equal to 0 and 9 (1)s equals
to 1
- // - MOD(b.col3, 3) will have 2 (42)s equal to 0 and 3 (1)s equals
to 1;
- // final results are 6 * 2 + 9 * 3 = 39 rows
- new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON
MOD(a.col3, 2) = MOD(b.col3, 3)"},
-
- // Specifically table A has 15 rows (10 on server1 and 5 on server2)
and table B has 5 rows (all on server1),
- // thus the final JOIN result will be 15 x 1 = 15.
- new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 =
b.col2"},
- // Reverse the order of join condition and join table order.
- new Object[]{"SELECT * FROM a JOIN b on b.col1 = a.col1 AND b.col2 =
a.col2"},
-
- // LEFT JOIN
- new Object[]{"SELECT * FROM a LEFT JOIN b on a.col1 = b.col2"},
-
- new Object[]{"SELECT a.col1, SUM(CASE WHEN b.col3 IS NULL THEN 0 ELSE
b.col3 END) "
- + " FROM a LEFT JOIN b on a.col1 = b.col2 GROUP BY a.col1"},
-
- // Specifically table A has 15 rows (10 on server1 and 5 on server2)
and table B has 5 rows (all on server1),
- // but only 1 out of 5 rows from table A will be selected out; and all
in table B will be selected.
- // thus the final JOIN result will be 1 x 3 x 1 = 3.
- new Object[]{"SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON
a.col1 = b.col2 "
- + " WHERE a.col3 >= 0 AND a.col2 = 'alice' AND b.col3 >= 0"},
-
- // Join query with IN and Not-IN clause. Table A's side of join will
return 9 rows and Table B's side will
- // return 2 rows. Join will be only on col1=bar and since A will
return 3 rows with that value and B will return
- // 1 row, the final output will have 3 rows.
- new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
- + " WHERE a.col1 IN ('foo', 'bar', 'alice') AND b.col2 NOT IN
('foo', 'alice')"},
-
- // Same query as above but written using OR/AND instead of IN.
- new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
- + " WHERE (a.col1 = 'foo' OR a.col1 = 'bar' OR a.col1 = 'alice')
AND b.col2 != 'foo'"
- + " AND b.col2 != 'alice'"},
-
- // Same as above but with single argument IN clauses. Left side of the
join returns 3 rows, and the right side
- // returns 5 rows. Only key where join succeeds is col1=foo, and since
table B has only 1 row with that value,
- // the number of rows should be 3.
- new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
- + " WHERE a.col1 IN ('foo') AND b.col2 NOT IN ('')"},
-
- // Range conditions with continuous and non-continuous range.
- new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
- + " WHERE a.col3 IN (1, 2, 3) OR (a.col3 > 10 AND a.col3 < 50)"},
-
- new Object[]{"SELECT col1, SUM(col3) FROM a WHERE a.col3 BETWEEN 23
AND 36 "
- + " GROUP BY col1 HAVING SUM(col3) > 10.0 AND MIN(col3) <> 123 AND
MAX(col3) BETWEEN 10 AND 20"},
-
- new Object[]{"SELECT col1, SUM(col3) FROM a WHERE (col3 > 0 AND col3 <
45) AND (col3 > 15 AND col3 < 50) "
- + " GROUP BY col1 HAVING (SUM(col3) > 10 AND SUM(col3) < 20) AND
(SUM(col3) > 30 AND SUM(col3) < 40)"},
-
- // Projection pushdown
- new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0
AND a.col2 = 'alice'"},
-
- // Inequality JOIN & partial filter pushdown
- new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3
>= 0 AND a.col3 > b.col3"},
-
- new Object[]{"SELECT * FROM a, b WHERE a.col1 > b.col2 AND a.col3 >
b.col3"},
-
- // Aggregation with group by
- new Object[]{"SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0
GROUP BY a.col1"},
-
- // Aggregation with multiple group key
- new Object[]{"SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3
>= 0 GROUP BY a.col1, a.col2"},
-
- // Aggregation without GROUP BY
- new Object[]{"SELECT SUM(col3) FROM a WHERE a.col3 >= 0 AND a.col2 =
'alice'"},
-
- // Aggregation with GROUP BY on a count star reference
- new Object[]{"SELECT a.col1, COUNT(*) FROM a WHERE a.col3 >= 0 GROUP
BY a.col1"},
-
- // project in intermediate stage
- // Specifically table A has 15 rows (10 on server1 and 5 on server2)
and table B has 5 rows (all on server1),
- // col1 on both are "foo", "bar", "alice", "bob", "charlie"
- // col2 on both are "foo", "bar", "alice", "foo", "bar",
- // filtered at : ^ ^
- // thus the final JOIN result will have 6 rows: 3 "foo" <-> "foo"; and
3 "bob" <-> "bob"
- new Object[]{"SELECT a.col1, a.col2, a.ts, b.col1, b.col3 FROM a JOIN
b ON a.col1 = b.col2 "
- + " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0"},
-
- // Making transform after JOIN, number of rows should be the same as
JOIN result.
- new Object[]{"SELECT a.col1, a.ts, a.col3 - b.col3 FROM a JOIN b ON
a.col1 = b.col2 "
- + " WHERE a.col3 >= 0 AND b.col3 >= 0"},
-
- // Making transform after GROUP-BY, number of rows should be the same
as GROUP-BY result.
- new Object[]{"SELECT a.col1, a.col2, SUM(a.col3) - MIN(a.col3) FROM a"
- + " WHERE a.col3 >= 0 GROUP BY a.col1, a.col2"},
-
- // GROUP BY after JOIN
- // - optimizable transport for GROUP BY key after JOIN, using
SINGLETON exchange
- // only 3 GROUP BY key exist because b.col2 cycles between "foo",
"bar", "alice".
- new Object[]{"SELECT a.col1, SUM(b.col3), COUNT(*), SUM(2) FROM a JOIN
b ON a.col1 = b.col2 "
- + " WHERE a.col3 >= 0 GROUP BY a.col1"},
- // - non-optimizable transport for GROUP BY key after JOIN, using
HASH exchange
- // only 2 GROUP BY key exist for b.col3.
- new Object[]{"SELECT b.col3, SUM(a.col3) FROM a JOIN b"
- + " on a.col1 = b.col1 AND a.col2 = b.col2 GROUP BY b.col3"},
-
- // Sub-query
- new Object[]{"SELECT b.col1, b.col3, i.maxVal FROM b JOIN "
- + " (SELECT a.col2 AS joinKey, MAX(a.col3) AS maxVal FROM a GROUP
BY a.col2) AS i "
- + " ON b.col1 = i.joinKey"},
-
- // Sub-query with IN clause to SEMI JOIN.
- new Object[]{"SELECT b.col1, b.col2, SUM(b.col3) * 100 / COUNT(b.col3)
FROM b WHERE b.col1 IN "
- + " (SELECT a.col2 FROM a WHERE a.col2 != 'foo') GROUP BY b.col1,
b.col2"},
- new Object[]{"SELECT SUM(b.col3) FROM b WHERE b.col3 > (SELECT
AVG(a.col3) FROM a WHERE a.col2 != 'bar')"},
-
- // Aggregate query with HAVING clause, "foo" and "bar" occurred 6/2
times each and "alice" occurred 3/1 times
- // numbers are cycle in (1, 42, 1, 42, 1), and (foo, bar, alice, foo,
bar)
- // - COUNT(*) < 5 matches "alice" (3 times)
- // - COUNT(*) > 5 matches "foo" and "bar" (6 times); so both will be
selected out SUM(a.col3) = (1 + 42) * 3
- // - last condition doesn't match anything.
- // total to 3 rows.
- new Object[]{"SELECT a.col2, COUNT(*), MAX(a.col3), MIN(a.col3),
SUM(a.col3) FROM a GROUP BY a.col2 "
- + "HAVING COUNT(*) < 5 OR (COUNT(*) > 5 AND SUM(a.col3) >= 10)"
- + "OR (MIN(a.col3) != 20 AND SUM(a.col3) = 100)"},
- new Object[]{"SELECT COUNT(*) AS Count, MAX(a.col3) AS \"max\" FROM a
GROUP BY a.col2 "
- + "HAVING Count > 1 AND \"max\" < 50"},
-
- // Order-by
- new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON a.col1 =
b.col1 ORDER BY a.col3, b.col3 DESC"},
- new Object[]{"SELECT MAX(a.col3) FROM a GROUP BY a.col2 ORDER BY
MAX(a.col3) - MIN(a.col3)"},
-
- // Test CAST
- // - implicit CAST
- new Object[]{"SELECT a.col1, a.col2, AVG(a.col3) FROM a GROUP BY
a.col1, a.col2"},
- new Object[]{"SELECT a.col1 FROM a WHERE a.col3 >= 0.5 AND a.col3 <
0.7 OR a.col3 = 42.0"},
- new Object[]{"SELECT a.col1, SUM(a.col3) FROM a GROUP BY a.col1 "
- + " HAVING MIN(a.col3) > 0.5 AND MIN(a.col3) <> 0.7 OR MIN(a.col3)
> 30"},
- // - explicit CAST
- new Object[]{"SELECT a.col1, CAST(SUM(a.col3) AS BIGINT) FROM a GROUP
BY a.col1"},
-
- // Test DISTINCT
- // - distinct value done via GROUP BY with empty expr aggregation
list.
- new Object[]{"SELECT a.col2, a.col3 FROM a JOIN b ON a.col1 = b.col1 "
- + " WHERE b.col3 > 0 GROUP BY a.col2, a.col3"},
-
- // Test optimized constant literal.
- new Object[]{"SELECT col1 FROM a WHERE col3 > 0 AND col3 < -5"},
- new Object[]{"SELECT COALESCE(SUM(col3), 0) FROM a WHERE col1 = 'foo'
AND col1 = 'bar'"},
- new Object[]{"SELECT SUM(CAST(col3 AS INTEGER)) FROM a HAVING
MIN(col3) BETWEEN 1 AND 0"},
- new Object[]{"SELECT col1, COUNT(col3) FROM a GROUP BY col1 HAVING
SUM(col3) > 40 AND SUM(col3) < 30"},
- };
- }
-
@DataProvider(name = "testDataWithSqlToFinalRowCount")
private Object[][] provideTestSqlAndRowCount() {
return new Object[][] {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 702e7b466d..9ad1723362 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestUtils;
import org.apache.pinot.query.QueryServerEnclosure;
+import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.routing.WorkerInstance;
import org.apache.pinot.query.service.QueryConfig;
@@ -50,7 +51,7 @@ import org.testng.annotations.BeforeClass;
-public class QueryRunnerTestBase {
+public class QueryRunnerTestBase extends QueryTestSet {
private static final File INDEX_DIR_S1_A = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableA");
private static final File INDEX_DIR_S1_B = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableB");
private static final File INDEX_DIR_S1_C = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableC");
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
index 9057d1a7ec..34e52f56bb 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Random;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.runtime.QueryRunner;
@@ -32,11 +33,10 @@ import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-public class QueryDispatcherTest {
+public class QueryDispatcherTest extends QueryTestSet {
private static final Random RANDOM_REQUEST_ID_GEN = new Random();
private static final int QUERY_SERVER_COUNT = 2;
private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
@@ -70,23 +70,13 @@ public class QueryDispatcherTest {
}
}
- @Test(dataProvider = "testDataWithSqlToCompiledAsWorkerRequest")
+ @Test(dataProvider = "testSql")
public void testQueryDispatcherCanSendCorrectPayload(String sql)
throws Exception {
QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
QueryDispatcher dispatcher = new QueryDispatcher();
int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(),
queryPlan);
Assert.assertTrue(PlannerUtils.isRootStage(reducerStageId));
- }
-
- @DataProvider(name = "testDataWithSqlToCompiledAsWorkerRequest")
- private Object[][] provideTestSqlToCompiledToWorkerRequest() {
- return new Object[][] {
- new Object[]{"SELECT * FROM b"},
- new Object[]{"SELECT * FROM a"},
- new Object[]{"SELECT * FROM a JOIN b ON a.col3 = b.col3"},
- new Object[]{"SELECT a.col1, a.ts, c.col2, c.col3 FROM a JOIN c ON
a.col1 = c.col2 "
- + " WHERE (a.col3 >= 0 OR a.col2 = 'foo') AND c.col3 >= 0"},
- };
+ dispatcher.shutdown();
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index 245f78e651..7202ab8d91 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.service;
import com.google.common.collect.Lists;
+import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.Comparator;
import java.util.HashMap;
@@ -31,6 +32,7 @@ import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.StageNode;
@@ -42,13 +44,12 @@ import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
-public class QueryServerTest {
+public class QueryServerTest extends QueryTestSet {
private static final Random RANDOM_REQUEST_ID_GEN = new Random();
private static final int QUERY_SERVER_COUNT = 2;
private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
@@ -88,7 +89,7 @@ public class QueryServerTest {
}
@SuppressWarnings("unchecked")
- @Test(dataProvider = "testDataWithSqlToCompiledAsWorkerRequest")
+ @Test(dataProvider = "testSql")
public void testWorkerAcceptsWorkerRequestCorrect(String sql)
throws Exception {
QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
@@ -126,17 +127,6 @@ public class QueryServerTest {
}
}
- @DataProvider(name = "testDataWithSqlToCompiledAsWorkerRequest")
- private Object[][] provideTestSqlToCompiledToWorkerRequest() {
- return new Object[][] {
- new Object[]{"SELECT * FROM b"},
- new Object[]{"SELECT * FROM a"},
- new Object[]{"SELECT * FROM a JOIN b ON a.col3 = b.col3"},
- new Object[]{"SELECT a.col1, a.ts, c.col2, c.col3 FROM a JOIN c ON
a.col1 = c.col2 "
- + " WHERE (a.col3 >= 0 OR a.col2 = 'foo') AND c.col3 >= 0"},
- };
- }
-
private static boolean isMetadataMapsEqual(StageMetadata left, StageMetadata
right) {
return left.getServerInstances().equals(right.getServerInstances())
&&
left.getServerInstanceToSegmentsMap().equals(right.getServerInstanceToSegmentsMap())
@@ -163,11 +153,12 @@ public class QueryServerTest {
private void submitRequest(Worker.QueryRequest queryRequest) {
String host = queryRequest.getMetadataMap().get("SERVER_INSTANCE_HOST");
int port =
Integer.parseInt(queryRequest.getMetadataMap().get("SERVER_INSTANCE_PORT"));
- PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub stub =
-
PinotQueryWorkerGrpc.newBlockingStub(ManagedChannelBuilder.forAddress(host,
port).usePlaintext().build());
+ ManagedChannel channel = ManagedChannelBuilder.forAddress(host,
port).usePlaintext().build();
+ PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub stub =
PinotQueryWorkerGrpc.newBlockingStub(channel);
Worker.QueryResponse resp = stub.submit(queryRequest);
// TODO: validate meaningful return value
Assert.assertNotNull(resp.getMetadataMap().get("OK"));
+ channel.shutdown();
}
private Worker.QueryRequest getQueryRequest(QueryPlan queryPlan, int
stageId) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]