gianm commented on code in PR #19397:
URL: https://github.com/apache/druid/pull/19397#discussion_r3176204717


##########
multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java:
##########
@@ -297,21 +304,29 @@ protected ReturnOrAwait<SegmentsInputSlice> 
runWithDataServerQuery(final DataSer
   protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder 
segmentHolder) throws IOException
   {
     if (cursor == null) {
-      final Segment segment = mapSegment(segmentHolder, closer);
-      final CursorFactory cursorFactory = segment.as(CursorFactory.class);
-      if (cursorFactory == null) {
-        throw DruidException.defensive(
-            "Null cursor factory found. Probably trying to issue a query 
against a segment being memory unmapped."
+      if (cursorHolderFuture == null) {
+        final Segment segment = mapSegment(segmentHolder, closer);
+        final CursorFactory cursorFactory = segment.as(CursorFactory.class);
+        if (cursorFactory == null) {
+          throw DruidException.defensive(
+              "Null cursor factory found. Probably trying to issue a query 
against a segment being memory unmapped."
+          );
+        }
+
+        cursorHolderFuture = cursorFactory.makeCursorHolderAsync(

Review Comment:
   Handling futures that return closeable things is tricky. Maybe we can 
improve it by changing the return of `makeCursorHolderAsync` from 
`ListenableFuture<CursorHolder>` to `AsyncCursorHolder` that is closeable and 
has methods `get()` (blocks if not ready), `close()` (closes the resource no 
matter where it is in its lifecycle), and `addReadyCallback(Runnable)` (used by 
nonblocking callers to learn when `get` is ready).
   
   The problem with the future approach is that once this call site gets the 
`cursorHolderFuture`, it's responsible for monitoring the future and closing 
`cursorHolder` if the future resolves successfully. This has to be done even if 
the processor is canceled before it has a chance to run through normally. It 
requires extra carefulness and is easy to mess up.
   
   One way it can be handled is by attaching a callback in `cleanup` that 
closes the holder in `onSuccess`, like:
   
   ```
   if (cursorHolderFuture != null) {
     Futures.addCallback(
       cursorHolderFuture,
       new FutureCallback<>() {
         void onSuccess(CursorHolder holder) { holder.close(); }
         void onFailure(Throwable t) { /* nothing */ }
       }
     );
   }
   ```
   
   But even with this structure, it's important watch out for pitfalls. A big 
one is that you can never cancel a future that returns a closeable thing like 
CursorHolder. Cancellation of the future can cause the object to be created, 
then orphaned and eventually GCed without being closed.
   
   If we can avoid these problems by returning something *directly* closeable 
(like this `AsyncCursorHolder` idea) rather than future-of-closeable, then the 
caller code becomes simpler.



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java:
##########
@@ -494,21 +534,38 @@ public Sequence<ResultRow> process(
     if (intervals.size() != 1) {
       throw new IAE("Should only have one interval, got[%s]", intervals);
     }
+  }
 
-    final ResourceHolder<ByteBuffer> bufferHolder = bufferPool.take();
+  private Sequence<ResultRow> processWithCursorHolder(
+      GroupByQuery query,
+      CursorFactory cursorFactory,
+      CursorHolder cursorHolder,
+      @Nullable TimeBoundaryInspector timeBoundaryInspector,
+      NonBlockingPool<ByteBuffer> bufferPool,
+      CursorBuildSpec buildSpec
+  )
+  {
+    final GroupByQueryConfig querySpecificConfig = 
configSupplier.get().withOverrides(query);
+
+    final ResourceHolder<ByteBuffer> bufferHolder;
+    try {
+      bufferHolder = bufferPool.take();
+    }
+    catch (Throwable e) {
+      CloseableUtils.closeAndWrapExceptions(cursorHolder);

Review Comment:
   `throw CloseableUtils.closeAndWrapInCatch(e, cursorHolder)` will properly 
retain exceptions from closing `cursorHolder` as suppressed exceptions on `e`.
   
   Although, there's probably some way of structuring this code to use the 
`Closer` to handle this better. Like, create the `Closer` first, register 
`cursorHolder` right after it's created, start the main `try`, then register 
`bufferHolder` after it's acquired. If the buffer fails to be acquired then the 
`catch` will close the `Closer` and release the `cursorHolder`.



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java:
##########
@@ -482,8 +484,46 @@ public Sequence<ResultRow> process(
       @Nullable GroupByQueryMetrics groupByQueryMetrics
   )
   {
-    final GroupByQueryConfig querySpecificConfig = 
configSupplier.get().withOverrides(query);
+    validateForProcess(query, cursorFactory);
+    final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, 
groupByQueryMetrics);
+    final CursorHolder cursorHolder = 
cursorFactory.makeCursorHolder(buildSpec);
+    return processWithCursorHolder(query, cursorFactory, cursorHolder, 
timeBoundaryInspector, bufferPool, buildSpec);
+  }
+
+  /**
+   * Asynchronous variant of {@link #process} that obtains the {@link 
CursorHolder} from
+   * {@link CursorFactory#makeCursorHolderAsync} so callers running on threads 
that must not block on I/O
+   * (e.g. MSQ frame processors) can yield via {@link 
org.apache.druid.frame.processor.ReturnOrAwait#awaitAllFutures}
+   * until the cursor holder is ready.
+   * <p>
+   * The processing-buffer reservation from {@code bufferPool} is deferred 
until the cursor holder is available, so
+   * the buffer is not held during cursor-holder I/O.
+   */
+  public ListenableFuture<Sequence<ResultRow>> processAsync(
+      GroupByQuery query,
+      CursorFactory cursorFactory,
+      @Nullable TimeBoundaryInspector timeBoundaryInspector,
+      NonBlockingPool<ByteBuffer> bufferPool,
+      @Nullable GroupByQueryMetrics groupByQueryMetrics
+  )
+  {
+    validateForProcess(query, cursorFactory);
+    final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, 
groupByQueryMetrics);
+    return FutureUtils.transform(
+        cursorFactory.makeCursorHolderAsync(buildSpec),
+        cursorHolder -> processWithCursorHolder(

Review Comment:
   Won't this end up doing the main processing in whatever thread happened to 
resolve the `makeCursorHolderAsync` future? (Because `FutureUtils.transform` 
uses a direct executor.) Possibly that'd be in a virtual storage loader thread.
   
   I think we'll need to either adjust this method to accept a 
`ListenableExecutorService` that will be used to run `processWithCursorHolder`, 
or break it up so callers first call `groupingEngine.makeCursorHolderAsync` and 
then call `groupingEngine.processCursorHolder`.



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java:
##########
@@ -482,8 +484,46 @@ public Sequence<ResultRow> process(
       @Nullable GroupByQueryMetrics groupByQueryMetrics
   )
   {
-    final GroupByQueryConfig querySpecificConfig = 
configSupplier.get().withOverrides(query);
+    validateForProcess(query, cursorFactory);
+    final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, 
groupByQueryMetrics);
+    final CursorHolder cursorHolder = 
cursorFactory.makeCursorHolder(buildSpec);
+    return processWithCursorHolder(query, cursorFactory, cursorHolder, 
timeBoundaryInspector, bufferPool, buildSpec);
+  }
+
+  /**
+   * Asynchronous variant of {@link #process} that obtains the {@link 
CursorHolder} from
+   * {@link CursorFactory#makeCursorHolderAsync} so callers running on threads 
that must not block on I/O
+   * (e.g. MSQ frame processors) can yield via {@link 
org.apache.druid.frame.processor.ReturnOrAwait#awaitAllFutures}
+   * until the cursor holder is ready.

Review Comment:
   This javadoc feels a bit too specific to me. It's enough to say that this is 
an asynchronous variant of `process` that uses 
`CursorFactory#makeCursorHolderAsync` to avoid blocking on acquisition of 
`CursorHolder`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to