ankitsultana commented on code in PR #10401:
URL: https://github.com/apache/pinot/pull/10401#discussion_r1132660323
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -135,7 +135,8 @@ public void init(PinotConfiguration config,
InstanceDataManager instanceDataMana
_queryRunnerExecutorService = Executors.newFixedThreadPool(
ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
new NamedThreadFactory("query_runner_on_" + _port + "_port"));
- _scheduler = new OpChainSchedulerService(new
RoundRobinScheduler(releaseMs), _queryWorkerExecutorService);
+ _scheduler = new OpChainSchedulerService(new
RoundRobinScheduler(releaseMs), _queryWorkerExecutorService,
+ releaseMs);
Review Comment:
I think you may want to call the two args constructor here? Otherwise the
cancellation ttl is same as releaseMs
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -81,21 +84,38 @@ public ResultTable submitAndReduce(long requestId,
QueryPlan queryPlan,
MailboxService<TransferableBlock> mailboxService, long timeoutMs,
Map<String, String> queryOptions,
Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
throws Exception {
- // submit all the distributed stages.
- int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions);
- // run reduce stage and return result.
- MailboxReceiveNode reduceNode = (MailboxReceiveNode)
queryPlan.getQueryStageMap().get(reduceStageId);
- MailboxReceiveOperator mailboxReceiveOperator =
createReduceStageOperator(mailboxService,
-
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
requestId,
- reduceNode.getSenderStageId(), reduceStageId,
reduceNode.getDataSchema(),
- new VirtualServerAddress(mailboxService.getHostname(),
mailboxService.getMailboxPort(), 0), timeoutMs);
- List<DataBlock> resultDataBlocks =
- reduceMailboxReceive(mailboxReceiveOperator, timeoutMs,
executionStatsAggregator, queryPlan);
- return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
- queryPlan.getQueryStageMap().get(0).getDataSchema());
+ try {
+ // submit all the distributed stages.
+ int reduceStageId = submit(requestId, queryPlan, timeoutMs,
queryOptions);
+ // run reduce stage and return result.
+ return runReducer(requestId, queryPlan, reduceStageId, timeoutMs,
mailboxService, executionStatsAggregator);
+ } catch (Exception e) {
+ cancel(requestId, queryPlan);
+ throw new RuntimeException("Error executing query: " +
ExplainPlanStageVisitor.explain(queryPlan), e);
+ }
}
- public int submit(long requestId, QueryPlan queryPlan, long timeoutMs,
Map<String, String> queryOptions)
+ private void cancel(long requestId, QueryPlan queryPlan) {
+ Set<DispatchClient> dispatchClientSet = new HashSet<>();
+ for (Map.Entry<Integer, StageMetadata> stage :
queryPlan.getStageMetadataMap().entrySet()) {
+ int stageId = stage.getKey();
+ // stage rooting at a mailbox receive node means reduce stage.
+ if (!(queryPlan.getQueryStageMap().get(stageId) instanceof
MailboxReceiveNode)) {
+ List<VirtualServer> serverInstances =
stage.getValue().getServerInstances();
+ for (VirtualServer serverInstance : serverInstances) {
+ String host = serverInstance.getHostname();
+ int servicePort = serverInstance.getQueryServicePort();
+ dispatchClientSet.add(getOrCreateDispatchClient(host, servicePort));
Review Comment:
dispatch client is cached anyways so do we need the set?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -38,15 +40,28 @@
@SuppressWarnings("UnstableApiUsage")
public class OpChainSchedulerService extends AbstractExecutionThreadService {
private static final Logger LOGGER =
LoggerFactory.getLogger(OpChainSchedulerService.class);
- // Default time scheduler is allowed to wait for a runnable OpChain to be
available
+ /**
+ * Default time scheduler is allowed to wait for a runnable OpChain to be
available.
+ */
private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
+ /**
+ * Default cancel signal retention, this should be set to several times
larger than
+ * {@link
org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
+ */
+ private static final long DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS
= 60_000L;
private final OpChainScheduler _scheduler;
private final ExecutorService _workerPool;
+ private final Cache<Long, Long> _cancelledRequests;
public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService
workerPool) {
+ this(scheduler, workerPool,
DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS);
+ }
+
+ public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService
workerPool, long releaseTimeoutMs) {
Review Comment:
Can we rename `releaseTimeoutMs` to the appropriate name?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]