gianm commented on code in PR #19397:
URL: https://github.com/apache/druid/pull/19397#discussion_r3176204717
##########
multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java:
##########
@@ -297,21 +304,29 @@ protected ReturnOrAwait<SegmentsInputSlice>
runWithDataServerQuery(final DataSer
protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder
segmentHolder) throws IOException
{
if (cursor == null) {
- final Segment segment = mapSegment(segmentHolder, closer);
- final CursorFactory cursorFactory = segment.as(CursorFactory.class);
- if (cursorFactory == null) {
- throw DruidException.defensive(
- "Null cursor factory found. Probably trying to issue a query
against a segment being memory unmapped."
+ if (cursorHolderFuture == null) {
+ final Segment segment = mapSegment(segmentHolder, closer);
+ final CursorFactory cursorFactory = segment.as(CursorFactory.class);
+ if (cursorFactory == null) {
+ throw DruidException.defensive(
+ "Null cursor factory found. Probably trying to issue a query
against a segment being memory unmapped."
+ );
+ }
+
+ cursorHolderFuture = cursorFactory.makeCursorHolderAsync(
Review Comment:
Handling futures that return closeable things is tricky. Maybe we can
improve it by changing the return of `makeCursorHolderAsync` from
`ListenableFuture<CursorHolder>` to `AsyncCursorHolder` that is closeable and
has methods `get()` (blocks if not ready), `close()` (closes the resource no
matter where it is in its lifecycle), and `addReadyCallback(Runnable)` (used by
nonblocking callers to learn when `get` is ready).
The problem with the future approach is that once this call site gets the
`cursorHolderFuture`, it's responsible for monitoring the future and closing
`cursorHolder` if the future resolves successfully. This has to be done even if
the processor is canceled before it has a chance to run through normally. It
requires extra carefulness and is easy to mess up.
One way it can be handled is by attaching a callback in `cleanup` that
closes the holder in `onSuccess`, like:
```
if (cursorHolderFuture != null) {
Futures.addCallback(
cursorHolderFuture,
new FutureCallback<>() {
void onSuccess(CursorHolder holder) { holder.close(); }
void onFailure(Throwable t) { /* nothing */ }
}
);
}
```
But even with this structure, it's important watch out for pitfalls. A big
one is that you can never cancel a future that returns a closeable thing like
CursorHolder. Cancellation of the future can cause the object to be created,
then orphaned and eventually GCed without being closed.
If we can avoid these problems by returning something *directly* closeable
(like this `AsyncCursorHolder` idea) rather than future-of-closeable, then the
caller code becomes simpler.
##########
processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java:
##########
@@ -494,21 +534,38 @@ public Sequence<ResultRow> process(
if (intervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", intervals);
}
+ }
- final ResourceHolder<ByteBuffer> bufferHolder = bufferPool.take();
+ private Sequence<ResultRow> processWithCursorHolder(
+ GroupByQuery query,
+ CursorFactory cursorFactory,
+ CursorHolder cursorHolder,
+ @Nullable TimeBoundaryInspector timeBoundaryInspector,
+ NonBlockingPool<ByteBuffer> bufferPool,
+ CursorBuildSpec buildSpec
+ )
+ {
+ final GroupByQueryConfig querySpecificConfig =
configSupplier.get().withOverrides(query);
+
+ final ResourceHolder<ByteBuffer> bufferHolder;
+ try {
+ bufferHolder = bufferPool.take();
+ }
+ catch (Throwable e) {
+ CloseableUtils.closeAndWrapExceptions(cursorHolder);
Review Comment:
`throw CloseableUtils.closeAndWrapInCatch(e, cursorHolder)` will properly
retain exceptions from closing `cursorHolder` as suppressed exceptions on `e`.
Although, there's probably some way of structuring this code to use the
`Closer` to handle this better. Like, create the `Closer` first, register
`cursorHolder` right after it's created, start the main `try`, then register
`bufferHolder` after it's acquired. If the buffer fails to be acquired then the
`catch` will close the `Closer` and release the `cursorHolder`.
##########
processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java:
##########
@@ -482,8 +484,46 @@ public Sequence<ResultRow> process(
@Nullable GroupByQueryMetrics groupByQueryMetrics
)
{
- final GroupByQueryConfig querySpecificConfig =
configSupplier.get().withOverrides(query);
+ validateForProcess(query, cursorFactory);
+ final CursorBuildSpec buildSpec = makeCursorBuildSpec(query,
groupByQueryMetrics);
+ final CursorHolder cursorHolder =
cursorFactory.makeCursorHolder(buildSpec);
+ return processWithCursorHolder(query, cursorFactory, cursorHolder,
timeBoundaryInspector, bufferPool, buildSpec);
+ }
+
+ /**
+ * Asynchronous variant of {@link #process} that obtains the {@link
CursorHolder} from
+ * {@link CursorFactory#makeCursorHolderAsync} so callers running on threads
that must not block on I/O
+ * (e.g. MSQ frame processors) can yield via {@link
org.apache.druid.frame.processor.ReturnOrAwait#awaitAllFutures}
+ * until the cursor holder is ready.
+ * <p>
+ * The processing-buffer reservation from {@code bufferPool} is deferred
until the cursor holder is available, so
+ * the buffer is not held during cursor-holder I/O.
+ */
+ public ListenableFuture<Sequence<ResultRow>> processAsync(
+ GroupByQuery query,
+ CursorFactory cursorFactory,
+ @Nullable TimeBoundaryInspector timeBoundaryInspector,
+ NonBlockingPool<ByteBuffer> bufferPool,
+ @Nullable GroupByQueryMetrics groupByQueryMetrics
+ )
+ {
+ validateForProcess(query, cursorFactory);
+ final CursorBuildSpec buildSpec = makeCursorBuildSpec(query,
groupByQueryMetrics);
+ return FutureUtils.transform(
+ cursorFactory.makeCursorHolderAsync(buildSpec),
+ cursorHolder -> processWithCursorHolder(
Review Comment:
Won't this end up doing the main processing in whatever thread happened to
resolve the `makeCursorHolderAsync` future? (Because `FutureUtils.transform`
uses a direct executor.) Possibly that'd be in a virtual storage loader thread.
I think we'll need to either adjust this method to accept a
`ListenableExecutorService` that will be used to run `processWithCursorHolder`,
or break it up so callers first call `groupingEngine.makeCursorHolderAsync` and
then call `groupingEngine.processCursorHolder`.
##########
processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java:
##########
@@ -482,8 +484,46 @@ public Sequence<ResultRow> process(
@Nullable GroupByQueryMetrics groupByQueryMetrics
)
{
- final GroupByQueryConfig querySpecificConfig =
configSupplier.get().withOverrides(query);
+ validateForProcess(query, cursorFactory);
+ final CursorBuildSpec buildSpec = makeCursorBuildSpec(query,
groupByQueryMetrics);
+ final CursorHolder cursorHolder =
cursorFactory.makeCursorHolder(buildSpec);
+ return processWithCursorHolder(query, cursorFactory, cursorHolder,
timeBoundaryInspector, bufferPool, buildSpec);
+ }
+
+ /**
+ * Asynchronous variant of {@link #process} that obtains the {@link
CursorHolder} from
+ * {@link CursorFactory#makeCursorHolderAsync} so callers running on threads
that must not block on I/O
+ * (e.g. MSQ frame processors) can yield via {@link
org.apache.druid.frame.processor.ReturnOrAwait#awaitAllFutures}
+ * until the cursor holder is ready.
Review Comment:
This javadoc feels a bit too specific to me. It's enough to say that this is
an asynchronous variant of `process` that uses
`CursorFactory#makeCursorHolderAsync` to avoid blocking on acquisition of
`CursorHolder`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]