This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fbd673482b [multistage] [feature] Add a query option to pass some v1
limit (#9957)
fbd673482b is described below
commit fbd673482b909e656c924274927ab3f58d6dd6cf
Author: Yao Liu <[email protected]>
AuthorDate: Thu Jan 19 14:54:57 2023 -0800
[multistage] [feature] Add a query option to pass some v1 limit (#9957)
---
.../MultiStageBrokerRequestHandler.java | 10 ++--
.../common/utils/config/QueryOptionsUtils.java | 24 +++++++++
.../core/plan/maker/InstancePlanMakerImplV2.java | 34 ++++++++----
.../tests/ClusterIntegrationTestUtils.java | 30 +++++++++--
.../tests/MultiStageEngineIntegrationTest.java | 9 ++++
.../runtime/plan/ServerRequestPlanVisitor.java | 60 +++++++++++++---------
.../pinot/query/service/QueryDispatcher.java | 10 ++--
.../pinot/query/service/QueryDispatcherTest.java | 2 +-
.../apache/pinot/spi/utils/CommonConstants.java | 5 ++
9 files changed, 137 insertions(+), 47 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index c9c604d9db..f5295bad52 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -98,7 +98,8 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
// it is OK to ignore the onDataAvailable callback because the broker
top-level operators
// always run in-line (they don't have any scheduler)
- _mailboxService = MultiplexingMailboxService.newInstance(_reducerHostname,
_reducerPort, config, ignored -> { });
+ _mailboxService = MultiplexingMailboxService.newInstance(_reducerHostname,
_reducerPort, config, ignored -> {
+ });
// TODO: move this to a startUp() function.
_mailboxService.start();
@@ -165,7 +166,8 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
ResultTable queryResults;
try {
- queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan,
_mailboxService, queryTimeoutMs);
+ queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan,
_mailboxService, queryTimeoutMs,
+ sqlNodeAndOptions.getOptions());
} catch (Exception e) {
LOGGER.info("query execution failed", e);
return new
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
@@ -175,8 +177,8 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
long executionEndTimeNs = System.nanoTime();
// Set total query processing time
- long totalTimeMs =
TimeUnit.NANOSECONDS.toMillis(sqlNodeAndOptions.getParseTimeNs()
- + (executionEndTimeNs - compilationStartTimeNs));
+ long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(
+ sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs -
compilationStartTimeNs));
brokerResponse.setTimeUsedMs(totalTimeMs);
brokerResponse.setResultTable(queryResults);
requestContext.setQueryProcessingTime(totalTimeMs);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 0fa3c82b33..420924af0a 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -166,4 +166,28 @@ public class QueryOptionsUtils {
public static String getOrderByAlgorithm(Map<String, String> queryOptions) {
return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM);
}
+
+ @Nullable
+ public static Integer getMultiStageLeafLimit(Map<String, String>
queryOptions) {
+ String maxLeafLimitStr =
queryOptions.get(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT);
+ return maxLeafLimitStr != null ? Integer.parseInt(maxLeafLimitStr) : null;
+ }
+
+ @Nullable
+ public static Integer getNumGroupsLimit(Map<String, String> queryOptions) {
+ String maxNumGroupLimit =
queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT);
+ return maxNumGroupLimit != null ? Integer.parseInt(maxNumGroupLimit) :
null;
+ }
+
+ @Nullable
+ public static Integer getMaxInitialResultHolderCapacity(Map<String, String>
queryOptions) {
+ String maxInitResultCap =
queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ return maxInitResultCap != null ? Integer.parseInt(maxInitResultCap) :
null;
+ }
+
+ @Nullable
+ public static Integer getGroupTrimThreshold(Map<String, String>
queryOptions) {
+ String groupByTrimThreshold =
queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD);
+ return groupByTrimThreshold != null ?
Integer.parseInt(groupByTrimThreshold) : null;
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 391a8797f0..4510e3e9fa 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -209,27 +209,39 @@ public class InstancePlanMakerImplV2 implements PlanMaker
{
// Set group-by query options
if (QueryContextUtils.isAggregationQuery(queryContext) &&
queryContext.getGroupByExpressions() != null) {
-
// Set maxInitialResultHolderCapacity
-
queryContext.setMaxInitialResultHolderCapacity(_maxInitialResultHolderCapacity);
-
+ Integer initResultCap =
QueryOptionsUtils.getMaxInitialResultHolderCapacity(queryOptions);
+ if (initResultCap != null) {
+ queryContext.setMaxInitialResultHolderCapacity(initResultCap);
+ } else {
+
queryContext.setMaxInitialResultHolderCapacity(_maxInitialResultHolderCapacity);
+ }
// Set numGroupsLimit
- queryContext.setNumGroupsLimit(_numGroupsLimit);
-
+ Integer numGroupsLimit =
QueryOptionsUtils.getNumGroupsLimit(queryOptions);
+ if (numGroupsLimit != null) {
+ queryContext.setNumGroupsLimit(numGroupsLimit);
+ } else {
+ queryContext.setNumGroupsLimit(_numGroupsLimit);
+ }
// Set minSegmentGroupTrimSize
Integer minSegmentGroupTrimSizeFromQuery =
QueryOptionsUtils.getMinSegmentGroupTrimSize(queryOptions);
- int minSegmentGroupTrimSize =
- minSegmentGroupTrimSizeFromQuery != null ?
minSegmentGroupTrimSizeFromQuery : _minSegmentGroupTrimSize;
- queryContext.setMinSegmentGroupTrimSize(minSegmentGroupTrimSize);
-
+ if (minSegmentGroupTrimSizeFromQuery != null) {
+
queryContext.setMinSegmentGroupTrimSize(minSegmentGroupTrimSizeFromQuery);
+ } else {
+ queryContext.setMinSegmentGroupTrimSize(_minSegmentGroupTrimSize);
+ }
// Set minServerGroupTrimSize
Integer minServerGroupTrimSizeFromQuery =
QueryOptionsUtils.getMinServerGroupTrimSize(queryOptions);
int minServerGroupTrimSize =
minServerGroupTrimSizeFromQuery != null ?
minServerGroupTrimSizeFromQuery : _minServerGroupTrimSize;
queryContext.setMinServerGroupTrimSize(minServerGroupTrimSize);
-
// Set groupTrimThreshold
- queryContext.setGroupTrimThreshold(_groupByTrimThreshold);
+ Integer groupTrimThreshold =
QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+ if (groupTrimThreshold != null) {
+ queryContext.setGroupTrimThreshold(groupTrimThreshold);
+ } else {
+ queryContext.setGroupTrimThreshold(_groupByTrimThreshold);
+ }
}
}
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 3a3e867348..67a1ea78f1 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -560,11 +560,28 @@ public class ClusterIntegrationTestUtils {
testQuery(pinotQuery, brokerUrl, pinotConnection, h2Query, h2Connection,
headers, null);
}
+ /**
+ * Compare # of rows in pinot and H2 only. Succeed if # of rows matches.
Note this only applies to non-aggregation
+ * query.
+ */
+ static void testQueryWithMatchingRowCount(String pinotQuery, String
brokerUrl,
+ org.apache.pinot.client.Connection pinotConnection, String h2Query,
Connection h2Connection,
+ @Nullable Map<String, String> headers, @Nullable Map<String, String>
extraJsonProperties)
+ throws Exception {
+ try {
+ testQueryInternal(pinotQuery, brokerUrl, pinotConnection, h2Query,
h2Connection, headers, extraJsonProperties,
+ true);
+ } catch (Exception e) {
+ failure(pinotQuery, h2Query, "Caught exception while testing query!", e);
+ }
+ }
+
static void testQuery(String pinotQuery, String brokerUrl,
org.apache.pinot.client.Connection pinotConnection,
String h2Query, Connection h2Connection, @Nullable Map<String, String>
headers,
@Nullable Map<String, String> extraJsonProperties) {
try {
- testQueryInternal(pinotQuery, brokerUrl, pinotConnection, h2Query,
h2Connection, headers, extraJsonProperties);
+ testQueryInternal(pinotQuery, brokerUrl, pinotConnection, h2Query,
h2Connection, headers, extraJsonProperties,
+ false);
} catch (Exception e) {
failure(pinotQuery, h2Query, "Caught exception while testing query!", e);
}
@@ -572,7 +589,8 @@ public class ClusterIntegrationTestUtils {
private static void testQueryInternal(String pinotQuery, String brokerUrl,
org.apache.pinot.client.Connection pinotConnection, String h2Query,
Connection h2Connection,
- @Nullable Map<String, String> headers, @Nullable Map<String, String>
extraJsonProperties)
+ @Nullable Map<String, String> headers, @Nullable Map<String, String>
extraJsonProperties,
+ boolean matchingRowCount)
throws Exception {
// broker response
JsonNode pinotResponse = ClusterTest.postQuery(pinotQuery, brokerUrl,
headers, extraJsonProperties);
@@ -609,7 +627,13 @@ public class ClusterIntegrationTestUtils {
List<String> expectedOrderByValues = new ArrayList<>();
int h2NumRows = getH2ExpectedValues(expectedValues,
expectedOrderByValues, h2ResultSet, h2ResultSet.getMetaData(),
orderByColumns);
-
+ if (matchingRowCount) {
+ if (numRows != h2NumRows) {
+ throw new RuntimeException("Pinot # of rows " + numRows + " doesn't
match h2 # of rows " + h2NumRows);
+ } else {
+ return;
+ }
+ }
comparePinotResultsWithExpectedValues(expectedValues,
expectedOrderByValues, resultTableResultSet, orderByColumns,
pinotQuery, h2Query, h2NumRows, pinotNumRecordsSelected);
} else {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 79a1c9c786..651a8da1bd 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -99,6 +99,15 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
super.testGeneratedQueries(false, true);
}
+ @Test
+ public void testQueryOptions()
+ throws Exception {
+ String pinotQuery = "SET multistageLeafLimit = 1; SELECT * FROM mytable;";
+ String h2Query = "SELECT * FROM mytable limit 1";
+ ClusterIntegrationTestUtils.testQueryWithMatchingRowCount(pinotQuery,
_brokerBaseApiUrl, getPinotConnection(),
+ h2Query, getH2Connection(), null, ImmutableMap.of("queryOptions",
"useMultistageEngine=true"));
+ }
+
@Override
protected Connection getPinotConnection() {
Properties properties = new Properties();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index d7ef580824..071a9325bd 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.optimizer.QueryOptimizer;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
@@ -60,6 +61,8 @@ import
org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQuer
import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -73,13 +76,12 @@ import
org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
*/
public class ServerRequestPlanVisitor implements StageNodeVisitor<Void,
ServerPlanRequestContext> {
private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ServerRequestPlanVisitor.class);
private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
- ImmutableList.of(
- PredicateComparisonRewriter.class.getName(),
- NonAggregationGroupByToDistinctQueryRewriter.class.getName()
- );
- private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>(
- QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
+ ImmutableList.of(PredicateComparisonRewriter.class.getName(),
+ NonAggregationGroupByToDistinctQueryRewriter.class.getName());
+ private static final List<QueryRewriter> QUERY_REWRITERS =
+ new
ArrayList<>(QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
private static final ServerRequestPlanVisitor INSTANCE = new
ServerRequestPlanVisitor();
@@ -92,11 +94,18 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
long requestId =
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
long timeoutMs =
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
PinotQuery pinotQuery = new PinotQuery();
- pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
+ Integer leafNodeLimit =
QueryOptionsUtils.getMultiStageLeafLimit(requestMetadataMap);
+ if (leafNodeLimit != null) {
+ pinotQuery.setLimit(leafNodeLimit);
+ } else {
+ pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
+ }
+ LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit);
pinotQuery.setExplain(false);
- ServerPlanRequestContext context = new
ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(),
- timeoutMs, stagePlan.getServerInstance().getHostname(),
stagePlan.getServerInstance().getPort(),
- stagePlan.getMetadataMap(), pinotQuery, tableType, timeBoundaryInfo);
+ ServerPlanRequestContext context =
+ new ServerPlanRequestContext(mailboxService, requestId,
stagePlan.getStageId(), timeoutMs,
+ stagePlan.getServerInstance().getHostname(),
stagePlan.getServerInstance().getPort(),
+ stagePlan.getMetadataMap(), pinotQuery, tableType,
timeBoundaryInfo);
// visit the plan and create query physical plan.
ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
@@ -112,8 +121,8 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
// 2. set pinot query options according to requestMetadataMap
-
pinotQuery.setQueryOptions(ImmutableMap.of(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
- String.valueOf(timeoutMs)));
+ pinotQuery.setQueryOptions(
+
ImmutableMap.of(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
String.valueOf(timeoutMs)));
// 3. wrapped around in broker request
BrokerRequest brokerRequest = new BrokerRequest();
@@ -145,19 +154,20 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
public Void visitAggregate(AggregateNode node, ServerPlanRequestContext
context) {
visitChildren(node, context);
// set group-by list
-
context.getPinotQuery().setGroupByList(CalciteRexExpressionParser.convertGroupByList(
- node.getGroupSet(), context.getPinotQuery()));
+ context.getPinotQuery()
+
.setGroupByList(CalciteRexExpressionParser.convertGroupByList(node.getGroupSet(),
context.getPinotQuery()));
// set agg list
-
context.getPinotQuery().setSelectList(CalciteRexExpressionParser.addSelectList(
- context.getPinotQuery().getGroupByList(), node.getAggCalls(),
context.getPinotQuery()));
+ context.getPinotQuery().setSelectList(
+
CalciteRexExpressionParser.addSelectList(context.getPinotQuery().getGroupByList(),
node.getAggCalls(),
+ context.getPinotQuery()));
return _aVoid;
}
@Override
public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
visitChildren(node, context);
-
context.getPinotQuery().setFilterExpression(CalciteRexExpressionParser.toExpression(
- node.getCondition(), context.getPinotQuery()));
+ context.getPinotQuery()
+
.setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(),
context.getPinotQuery()));
return _aVoid;
}
@@ -182,8 +192,8 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
@Override
public Void visitProject(ProjectNode node, ServerPlanRequestContext context)
{
visitChildren(node, context);
-
context.getPinotQuery().setSelectList(CalciteRexExpressionParser.overwriteSelectList(
- node.getProjects(), context.getPinotQuery()));
+ context.getPinotQuery()
+
.setSelectList(CalciteRexExpressionParser.overwriteSelectList(node.getProjects(),
context.getPinotQuery()));
return _aVoid;
}
@@ -191,8 +201,9 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
public Void visitSort(SortNode node, ServerPlanRequestContext context) {
visitChildren(node, context);
if (node.getCollationKeys().size() > 0) {
-
context.getPinotQuery().setOrderByList(CalciteRexExpressionParser.convertOrderByList(node.getCollationKeys(),
- node.getCollationDirections(), context.getPinotQuery()));
+ context.getPinotQuery().setOrderByList(
+
CalciteRexExpressionParser.convertOrderByList(node.getCollationKeys(),
node.getCollationDirections(),
+ context.getPinotQuery()));
}
if (node.getFetch() > 0) {
context.getPinotQuery().setLimit(node.getFetch());
@@ -210,8 +221,8 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
.tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName()));
dataSource.setTableName(tableNameWithType);
context.getPinotQuery().setDataSource(dataSource);
- context.getPinotQuery().setSelectList(node.getTableScanColumns().stream()
-
.map(RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
+ context.getPinotQuery().setSelectList(
+
node.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
return _aVoid;
}
@@ -226,6 +237,7 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
child.visit(this, context);
}
}
+
/**
* Helper method to attach the time boundary to the given PinotQuery.
*/
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 4cbd4aa624..f0eb564327 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -62,10 +62,10 @@ public class QueryDispatcher {
}
public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
- MailboxService<TransferableBlock> mailboxService, long timeoutMs)
+ MailboxService<TransferableBlock> mailboxService, long timeoutMs,
Map<String, String> queryOptions)
throws Exception {
// submit all the distributed stages.
- int reduceStageId = submit(requestId, queryPlan, timeoutMs);
+ int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions);
// run reduce stage and return result.
MailboxReceiveNode reduceNode = (MailboxReceiveNode)
queryPlan.getQueryStageMap().get(reduceStageId);
MailboxReceiveOperator mailboxReceiveOperator =
createReduceStageOperator(mailboxService,
@@ -83,7 +83,7 @@ public class QueryDispatcher {
return resultTable;
}
- public int submit(long requestId, QueryPlan queryPlan, long timeoutMs)
+ public int submit(long requestId, QueryPlan queryPlan, long timeoutMs,
Map<String, String> queryOptions)
throws Exception {
int reduceStageId = -1;
for (Map.Entry<Integer, StageMetadata> stage :
queryPlan.getStageMetadataMap().entrySet()) {
@@ -100,7 +100,9 @@ public class QueryDispatcher {
Worker.QueryResponse response =
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId,
serverInstance)))
.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID,
String.valueOf(requestId))
- .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
String.valueOf(timeoutMs)).build());
+ .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
String.valueOf(timeoutMs))
+ .putAllMetadata(queryOptions).build());
+
if
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
throw new RuntimeException(
String.format("Unable to execute query plan at stage %s on
server %s: ERROR: %s", stageId,
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
index cff9a26d37..b0f2dc0a5e 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
@@ -78,7 +78,7 @@ public class QueryDispatcherTest extends QueryTestSet {
throws Exception {
QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
QueryDispatcher dispatcher = new QueryDispatcher();
- int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(),
queryPlan, 10_000L);
+ int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(),
queryPlan, 10_000L, new HashMap<>());
Assert.assertTrue(PlannerUtils.isRootStage(reducerStageId));
dispatcher.shutdown();
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3e499b30ce..0abc692fbb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -295,6 +295,11 @@ public class CommonConstants {
public static final String ORDER_BY_ALGORITHM = "orderByAlgorithm";
+ public static final String MULTI_STAGE_LEAF_LIMIT =
"multiStageLeafLimit";
+ public static final String NUM_GROUPS_LIMIT = "numGroupsLimit";
+ public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"maxInitialResultHolderCapacity";
+ public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold";
+
// TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
@Deprecated
public static final String PRESERVE_TYPE = "preserveType";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]