agavra commented on code in PR #9836:
URL: https://github.com/apache/pinot/pull/9836#discussion_r1037342781
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java:
##########
@@ -63,4 +63,9 @@ public boolean isInitialized() {
public boolean isClosed() {
return _closed && _queue.size() == 0;
}
+
+ @Override
+ public void close() {
Review Comment:
should we clear the queue here?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java:
##########
@@ -130,7 +130,9 @@ public void onError(Throwable e) {
@Override
public void onCompleted() {
- _isCompleted.set(true);
- _responseObserver.onCompleted();
+ if(!_isCompleted.get()){
+ _isCompleted.set(true);
Review Comment:
there's a race condition here, it should be `if
(!isCompleted.compareAndSet(false, true))`
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java:
##########
@@ -83,6 +87,7 @@ private void shutdown() {
@Override
public void onCompleted() {
+ finishLatch.countDown();
Review Comment:
nit: maybe we move this and `_isCompleted.set(true)` to `shutdown` so both
`onError` and `onCompleted` have the same behavior.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -97,10 +97,12 @@ public void runJob() {
// not complete, needs to re-register for scheduling
register(operatorChain);
} else {
- LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+ operatorChain.getRoot().close();
Review Comment:
nit: let's keep both log statements (though looks like I forgot to change
this one to `debug`!)
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java:
##########
@@ -18,36 +18,58 @@
*/
package org.apache.pinot.query.runtime.plan;
+import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
public class PlanRequestContext {
protected final MailboxService<TransferableBlock> _mailboxService;
protected final long _requestId;
- protected final int _stageId;
protected final String _hostName;
protected final int _port;
protected final Map<Integer, StageMetadata> _metadataMap;
+ // TODO: Add exchange map if multiple exchanges are needed.
+ BlockExchange _exchange;
Review Comment:
I think it breaks some abstraction barriers to allow any piece of code that
has access to the `PlanRequestContext` to exchange blocks via a
`BlockExchange`. Only the `MailboxSendOperator` should be able to send blocks
IMO - otherwise it can be difficult to debug the ordering of events that are
sent.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -35,9 +36,12 @@ public class OpChain {
// TODO: build timers that are partial-execution aware
private final Supplier<ThreadResourceUsageProvider> _timer;
- public OpChain(Operator<TransferableBlock> root) {
- _root = root;
+ // TODO: refactor this into OpChainContext
+ public PlanRequestContext _context;
Review Comment:
nit: `private final`
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -97,10 +97,12 @@ public void runJob() {
// not complete, needs to re-register for scheduling
register(operatorChain);
} else {
- LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+ operatorChain.getRoot().close();
}
} catch (Exception e) {
- LOGGER.error("Failed to execute query!", e);
+
operatorChain._context.getExchange().send(TransferableBlockUtils.getErrorTransferableBlock(e));
Review Comment:
this breaks some abstraction boundaries - this scheduler service should know
nothing about the exchange or sending blocks; instead we should consider adding
this to `MailboxSendOperator` (which is always the root operator for these
chains). FWIW, I think that's already the case.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -97,10 +97,12 @@ public void runJob() {
// not complete, needs to re-register for scheduling
register(operatorChain);
} else {
- LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+ operatorChain.getRoot().close();
}
} catch (Exception e) {
- LOGGER.error("Failed to execute query!", e);
+
operatorChain._context.getExchange().send(TransferableBlockUtils.getErrorTransferableBlock(e));
+ // TODO: pass this error through context.
+ operatorChain.getRoot().close();
Review Comment:
consider pushing this into an `OpChain#close` method, we can also call that
method from the scheduler in situations where the scheduler leaks operator
chains (should never happen if errors are propagated properly, but it's good to
be defensive in this situation)
even better would be to have a `scheduler.unregister(OpChain)` method that
we call here, that way we can also use that to clean up things like tracked
mailboxes (see my comment on #9887)
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java:
##########
@@ -33,204 +33,204 @@ public class QueryTestSet {
public Object[][] provideTestSql() {
return new Object[][]{
// Order BY LIMIT
- new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3"},
- new Object[]{"SELECT * FROM a ORDER BY col1, ts LIMIT 10"},
- new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 20"},
- new Object[]{"SELECT * FROM a ORDER BY col1, ts LIMIT 1, 2"},
- new Object[]{"SELECT * FROM a ORDER BY col1, ts LIMIT 2 OFFSET 1"},
-
- // No match filter
- new Object[]{"SELECT * FROM b WHERE col3 < 0.5"},
-
- // Hybrid table
+// new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3"},
Review Comment:
(reminder) I know this is a draft, but let's make sure these pass and
uncomment them (or delete them if we don't want them anymore)
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java:
##########
@@ -63,6 +63,12 @@ protected BaseResultsBlock getNextBlock() {
return _childOperator.nextBlock();
}
+ @Override
+ public void close()
Review Comment:
(suggestion) maybe this is the default implementation in `BaseOperator`?
(will make the review a bit easier)
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -76,14 +77,13 @@ protected void run()
if (!isRunning()) {
return;
}
-
OpChain operatorChain = _scheduler.next();
_workerPool.submit(new TraceRunnable() {
@Override
- public void runJob() {
+ public void runJob()
+ throws InterruptedException {
Review Comment:
note: this should never throw as the worker pool threads will just die and
we'll be left with a dangling worker pool (these issues are really tough to
debug). Instead let's catch any exceptions and handle them
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]