This is an automated email from the ASF dual-hosted git repository.
clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8ce2f36ce4d feat: add async CursorFactory API and migrate MSQ frame
processors to use it (#19397)
8ce2f36ce4d is described below
commit 8ce2f36ce4dcba7420cc47801ec23148854dde9a
Author: Clint Wylie <[email protected]>
AuthorDate: Fri May 8 03:39:31 2026 -0700
feat: add async CursorFactory API and migrate MSQ frame processors to use
it (#19397)
changes:
* add `AsyncCursorHolder` to manage async loading lifecycle for a cursor
holder until ownership of the `CursorHolder` it produces can be transferred to
the consumer (see javadoc for details)
* add `CursorFactory.makeCursorHolderAsync(CursorBuildSpec)` for cursor
factories backed by partial downloads can do I/O without blocking processing
threads, with a default implementation returning
`AsyncCursorHolder.completed(makeCursorHolder(spec))` so existing
implementations remain async-correct without changes
* add `GroupingEngine.makeCursorHolderAsync` returning `AsyncCursorHolder`,
and extracting shared `processWithCursorHolder` helper from
`GroupingEngine.process()`, so that a caller which can yield and then resume
can wait for the `CursorHolder` to be ready and later process it
* migrate `ScanQueryFrameProcessor.runWithSegment` to call
`makeCursorHolderAsync` and yield via `ReturnOrAwait` while the load is pending
* migrate `GroupByPreShuffleFrameProcessor.runWithSegment` cursor path to
call `GroupingEngine.makeCursorHolderAsync` and yield via `ReturnOrAwait` while
loading
---
.../groupby/GroupByPreShuffleFrameProcessor.java | 81 ++++--
.../msq/querykit/scan/ScanQueryFrameProcessor.java | 49 +++-
.../querykit/scan/ScanQueryFrameProcessorTest.java | 183 ++++++++++++
.../apache/druid/query/groupby/GroupingEngine.java | 75 ++++-
.../apache/druid/segment/AsyncCursorHolder.java | 314 +++++++++++++++++++++
.../org/apache/druid/segment/CursorFactory.java | 13 +
.../druid/segment/AsyncCursorHolderTest.java | 230 +++++++++++++++
7 files changed, 906 insertions(+), 39 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index e960e9cff79..54f1eda6299 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.querykit.groupby;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.guava.FutureUtils;
@@ -58,8 +59,10 @@ import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
+import org.apache.druid.segment.AsyncCursorHolder;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.CursorFactory;
+import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentMapFunction;
import org.apache.druid.segment.TimeBoundaryInspector;
@@ -94,6 +97,18 @@ public class GroupByPreShuffleFrameProcessor extends
BaseLeafFrameProcessor
private SegmentsInputSlice handedOffSegments = null;
private Yielder<Yielder<ResultRow>> currentResultsYielder;
private ListenableFuture<DataServerQueryResult<ResultRow>>
dataServerQueryResultFuture;
+ @Nullable
+ private CursorFactory currentCursorFactory;
+ @Nullable
+ private TimeBoundaryInspector currentTimeBoundaryInspector;
+ /**
+ * In-flight {@link GroupingEngine#makeCursorHolderAsync} handle for the
current segment, when {@link #resultYielder}
+ * has not yet been derived. Registered on {@link #closer} so the produced
{@link CursorHolder} is always disposed
+ * regardless of where the underlying load is in its lifecycle. Cleared
after we transfer ownership of the holder to
+ * {@link GroupingEngine#processCursorHolder} (which moves it onto the
resulting Sequence's baggage closer).
+ */
+ @Nullable
+ private AsyncCursorHolder asyncCursorHolder;
public GroupByPreShuffleFrameProcessor(
final GroupByQuery query,
@@ -185,28 +200,60 @@ public class GroupByPreShuffleFrameProcessor extends
BaseLeafFrameProcessor
protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder
segmentHolder) throws IOException
{
if (resultYielder == null) {
- final Segment segment = mapSegment(segmentHolder, closer);
- final TimeBoundaryInspector tbi =
segment.as(TimeBoundaryInspector.class);
- final Sequence<ResultRow> rowSequence;
+ if (asyncCursorHolder == null && currentCursorFactory == null) {
+ // First invocation for this segment: map it, check the TimeBoundary
fast path, otherwise kick off the async
+ // cursor-holder load and cache the cursor factory + time-boundary
inspector for the follow-up invocation.
+ final Segment segment = mapSegment(segmentHolder, closer);
+ currentTimeBoundaryInspector = segment.as(TimeBoundaryInspector.class);
+
+ if (GroupByTimeBoundaryUtils.canUseTimeBoundaryInspector(
+ query,
+ currentTimeBoundaryInspector,
+ segmentHolder.getDescriptor()
+ )) {
+ // Resolve this query using the TimeBoundaryInspector, no need for a
cursor.
+ resultYielder = Yielders.each(
+ Sequences.simple(
+
List.of(GroupByTimeBoundaryUtils.computeTimeBoundaryResult(query,
currentTimeBoundaryInspector))
+ )
+ );
+ } else {
+ currentCursorFactory =
Objects.requireNonNull(segment.as(CursorFactory.class));
+ // Resolve this query using a cursor.
+ final GroupByQuery segmentQuery = (GroupByQuery) query
+ .withQuerySegmentSpec(new
SpecificSegmentSpec(segmentHolder.getDescriptor()))
+ .optimizeForSegment(new
PerSegmentQueryOptimizationContext(segmentHolder.getDescriptor()));
+ asyncCursorHolder = closer.register(
+ groupingEngine.makeCursorHolderAsync(
+ segmentQuery,
+ currentCursorFactory,
+ null
+ )
+ );
+ }
+ }
- if (GroupByTimeBoundaryUtils.canUseTimeBoundaryInspector(query, tbi,
segmentHolder.getDescriptor())) {
- // Resolve this query using the TimeBoundaryInspector, no need for a
cursor.
- rowSequence =
Sequences.simple(List.of(GroupByTimeBoundaryUtils.computeTimeBoundaryResult(query,
tbi)));
- } else {
- // Resolve this query using a cursor.
- final GroupByQuery segmentQuery = (GroupByQuery) query
- .withQuerySegmentSpec(new
SpecificSegmentSpec(segmentHolder.getDescriptor()))
- .optimizeForSegment(new
PerSegmentQueryOptimizationContext(segmentHolder.getDescriptor()));
- rowSequence = groupingEngine.process(
- segmentQuery,
- Objects.requireNonNull(segment.as(CursorFactory.class)),
- tbi,
+ if (asyncCursorHolder != null) {
+ if (!asyncCursorHolder.isReady()) {
+ final SettableFuture<?> awaitFuture = SettableFuture.create();
+ asyncCursorHolder.addReadyCallback(() -> awaitFuture.set(null));
+ return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture));
+ }
+ // The holder is ready, ownership of the holder transitions onto the
returned Sequence's baggage closer
+ final CursorHolder holder = asyncCursorHolder.release();
+ asyncCursorHolder = null;
+ // currentCursorFactory is non-null whenever asyncCursorHolder is
non-null (both are set together in the
+ // first-invocation branch above). The requireNonNull pins the
invariant for static analysis.
+ final Sequence<ResultRow> rowSequence =
groupingEngine.processCursorHolder(
+ query.withQuerySegmentSpec(new
SpecificSegmentSpec(segmentHolder.getDescriptor())),
+ Objects.requireNonNull(currentCursorFactory),
+ holder,
+ currentTimeBoundaryInspector,
bufferPool,
null
);
+ resultYielder = Yielders.each(rowSequence);
}
-
- resultYielder = Yielders.each(rowSequence);
}
populateFrameWriterAndFlushIfNeeded();
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 82e596b3e29..0935cf63c05 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -26,6 +26,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.guava.FutureUtils;
@@ -69,6 +70,7 @@ import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.SpecificSegmentSpec;
+import org.apache.druid.segment.AsyncCursorHolder;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorFactory;
@@ -118,6 +120,14 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
private final Closer closer = Closer.create();
private Cursor cursor;
+ /**
+ * In-flight {@link CursorFactory#makeCursorHolderAsync} handle for the
current segment, when {@link #cursor} has not
+ * yet been derived. Registered on {@link #closer} as soon as it is created
so the produced {@link CursorHolder} is
+ * always disposed regardless of where the underlying load is in its
lifecycle. Cleared after the holder is consumed
+ * and ownership transitions to {@link #cursorCloser}.
+ */
+ @Nullable
+ private AsyncCursorHolder asyncCursorHolder;
private ListenableFuture<DataServerQueryResult<Object[]>>
dataServerQueryResultFuture;
private Closeable cursorCloser;
/**
@@ -297,21 +307,36 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder
segmentHolder) throws IOException
{
if (cursor == null) {
- final Segment segment = mapSegment(segmentHolder, closer);
- final CursorFactory cursorFactory = segment.as(CursorFactory.class);
- if (cursorFactory == null) {
- throw DruidException.defensive(
- "Null cursor factory found. Probably trying to issue a query
against a segment being memory unmapped."
+ if (asyncCursorHolder == null) {
+ final Segment segment = mapSegment(segmentHolder, closer);
+ final CursorFactory cursorFactory = segment.as(CursorFactory.class);
+ if (cursorFactory == null) {
+ throw DruidException.defensive(
+ "Null cursor factory found. Probably trying to issue a query
against a segment being memory unmapped."
+ );
+ }
+
+ asyncCursorHolder = closer.register(
+ cursorFactory.makeCursorHolderAsync(
+ ScanQueryEngine.makeCursorBuildSpec(
+ query.withQuerySegmentSpec(new
SpecificSegmentSpec(segmentHolder.getDescriptor())),
+ null
+ )
+ )
);
}
- final CursorHolder nextCursorHolder =
- cursorFactory.makeCursorHolder(
- ScanQueryEngine.makeCursorBuildSpec(
- query.withQuerySegmentSpec(new
SpecificSegmentSpec(segmentHolder.getDescriptor())),
- null
- )
- );
+ if (!asyncCursorHolder.isReady()) {
+ final SettableFuture<?> awaitFuture = SettableFuture.create();
+ asyncCursorHolder.addReadyCallback(() -> awaitFuture.set(null));
+ return ReturnOrAwait.awaitAllFutures(ImmutableList.of(awaitFuture));
+ }
+
+ // Transfer ownership of the holder out of the AsyncCursorHolder;
setNextCursor manages the holder's lifecycle
+ // from here on. The wrapper stays registered on closer (close() is now
a no-op since release was called) so
+ // we don't need to track it further.
+ final CursorHolder nextCursorHolder = asyncCursorHolder.release();
+ asyncCursorHolder = null;
final Cursor nextCursor;
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
index 8b5952adcaf..9922be0a6c2 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
@@ -44,29 +44,40 @@ import org.apache.druid.msq.querykit.ReadableInput;
import org.apache.druid.msq.querykit.SegmentReferenceHolder;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.AsyncCursorHolder;
+import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
+import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexCursorFactory;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountedSegmentProvider;
+import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentMapFunction;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
+import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.jupiter.api.Assertions;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
{
@@ -262,4 +273,176 @@ public class ScanQueryFrameProcessorTest extends
FrameProcessorTestBase
+ "2011-01-02T00:00:00.000Z/2021-01-01T00:00:00.000Z]]"))
);
}
+
+ /**
+ * Verifies that {@link ScanQueryFrameProcessor#runWithSegment} yields via
{@link
+ * org.apache.druid.frame.processor.ReturnOrAwait#awaitAllFutures} when
{@link CursorFactory#makeCursorHolderAsync}
+ * returns an {@link AsyncCursorHolder} that has not yet completed, and
resumes after it does. Exercises the partial
+ * / non-blocking I/O integration path on the MSQ side without requiring a
real partial segment.
+ */
+ @Test
+ public void test_runWithSegments_asyncCursorHolderAwaits() throws Exception
+ {
+ final QueryableIndex queryableIndex = TestIndex.getMMappedTestIndex();
+ final CursorFactory baseCursorFactory = new
QueryableIndexCursorFactory(queryableIndex);
+ final AsyncCursorHolder deferredAsyncHolder = new AsyncCursorHolder(null);
+
+ final CursorFactory deferredCursorFactory = new CursorFactory()
+ {
+ @Override
+ public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+ {
+ return baseCursorFactory.makeCursorHolder(spec);
+ }
+
+ @Override
+ public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+ {
+ return deferredAsyncHolder;
+ }
+
+ @Override
+ public RowSignature getRowSignature()
+ {
+ return baseCursorFactory.getRowSignature();
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ return baseCursorFactory.getColumnCapabilities(column);
+ }
+ };
+
+ final QueryableIndexSegment baseSegment = new
QueryableIndexSegment(queryableIndex, SegmentId.dummy("test"));
+ final Segment wrappedSegment = new Segment()
+ {
+ @Override
+ public SegmentId getId()
+ {
+ return baseSegment.getId();
+ }
+
+ @Override
+ public Interval getDataInterval()
+ {
+ return baseSegment.getDataInterval();
+ }
+
+ @Override
+ public void validateOrElseThrow(PolicyEnforcer policyEnforcer)
+ {
+ baseSegment.validateOrElseThrow(policyEnforcer);
+ }
+
+ @Override
+ public boolean isTombstone()
+ {
+ return baseSegment.isTombstone();
+ }
+
+ @Override
+ public String getDebugString()
+ {
+ return baseSegment.getDebugString();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T as(@Nonnull Class<T> clazz)
+ {
+ if (CursorFactory.class.equals(clazz)) {
+ return (T) deferredCursorFactory;
+ }
+ return baseSegment.as(clazz);
+ }
+
+ @Override
+ public void close()
+ {
+ baseSegment.close();
+ }
+ };
+
+ final ScanQuery query =
+ Druids.newScanQueryBuilder()
+ .dataSource("test")
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.ETERNITY)))
+ .columns(baseCursorFactory.getRowSignature().getColumnNames())
+ .build();
+
+ final BlockingQueueFrameChannel outputChannel =
BlockingQueueFrameChannel.minimal();
+ final FrameWriterFactory frameWriterFactory = new
LimitedFrameWriterFactory(
+ FrameWriters.makeFrameWriterFactory(
+ FrameType.latestRowBased(),
+ new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+ baseCursorFactory.getRowSignature(),
+ Collections.emptyList(),
+ false
+ ),
+ 1
+ );
+
+ final ReferenceCountedSegmentProvider segmentReferenceProvider = new
ReferenceCountedSegmentProvider(wrappedSegment);
+ final ScanQueryFrameProcessor processor = new ScanQueryFrameProcessor(
+ query,
+ null,
+ new DefaultObjectMapper(),
+ ReadableInput.segment(
+ new SegmentReferenceHolder(
+ new SegmentReference(
+ SegmentId.dummy("test").toDescriptor(),
+ segmentReferenceProvider.acquireReference(),
+ null
+ ),
+ null,
+ null
+ )
+ ),
+ SegmentMapFunction.IDENTITY,
+ new ResourceHolder<>()
+ {
+ @Override
+ public WritableFrameChannel get()
+ {
+ return outputChannel.writable();
+ }
+
+ @Override
+ public void close()
+ {
+ try {
+ outputChannel.writable().close();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ },
+ new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {})
+ );
+
+ final ListenableFuture<Object> retVal = exec.runFully(processor, null);
+
+ // Processor should be awaiting the deferred holder and have produced no
rows yet.
+ Thread.sleep(200);
+ Assertions.assertFalse(retVal.isDone(), "processor should be awaiting the
deferred AsyncCursorHolder");
+ Assertions.assertFalse(outputChannel.readable().canRead(), "no frames
should have been written yet");
+
+ // Complete the load and verify the processor proceeds to produce all rows.
+
deferredAsyncHolder.set(baseCursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query,
null)));
+
+ final Sequence<List<Object>> rowsFromProcessor =
FrameTestUtil.readRowsFromFrameChannel(
+ outputChannel.readable(),
+ FrameReader.create(baseCursorFactory.getRowSignature())
+ );
+
+ FrameTestUtil.assertRowsEqual(
+ FrameTestUtil.readRowsFromCursorFactory(baseCursorFactory,
baseCursorFactory.getRowSignature(), false),
+ rowsFromProcessor
+ );
+
+ Assert.assertEquals(Unit.instance(), retVal.get(30, TimeUnit.SECONDS));
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
index 3f85bbf63e1..2b4ad7fefff 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
@@ -75,6 +75,7 @@ import
org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.LimitSpec;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.AsyncCursorHolder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
@@ -482,8 +483,49 @@ public class GroupingEngine
@Nullable GroupByQueryMetrics groupByQueryMetrics
)
{
- final GroupByQueryConfig querySpecificConfig =
configSupplier.get().withOverrides(query);
+ validateForProcess(query, cursorFactory);
+ final CursorBuildSpec buildSpec = makeCursorBuildSpec(query,
groupByQueryMetrics);
+ final CursorHolder cursorHolder =
cursorFactory.makeCursorHolder(buildSpec);
+ return processWithCursorHolder(query, cursorFactory, cursorHolder,
timeBoundaryInspector, bufferPool, buildSpec);
+ }
+ /**
+ * Obtain an {@link AsyncCursorHolder} for this query's cursor build spec.
Pairs with {@link #processCursorHolder},
+ * allowing a non-blocking caller to call this, yield until the returned
holder is ready, and then call
+ * {@link #processCursorHolder} on its processing thread to run the actual
aggregation.
+ */
+ public AsyncCursorHolder makeCursorHolderAsync(
+ GroupByQuery query,
+ CursorFactory cursorFactory,
+ @Nullable GroupByQueryMetrics groupByQueryMetrics
+ )
+ {
+ validateForProcess(query, cursorFactory);
+ final CursorBuildSpec buildSpec = makeCursorBuildSpec(query,
groupByQueryMetrics);
+ return cursorFactory.makeCursorHolderAsync(buildSpec);
+ }
+
+ /**
+ * Run the aggregation against an already-loaded {@link CursorHolder}. The
caller is responsible for acquiring the
+ * holder (either directly from a {@link CursorFactory#makeCursorHolder} as
we do in {@link #process} or using
+ * {@link #makeCursorHolderAsync} + {@link AsyncCursorHolder#release})
+ */
+ public Sequence<ResultRow> processCursorHolder(
+ GroupByQuery query,
+ CursorFactory cursorFactory,
+ CursorHolder cursorHolder,
+ @Nullable TimeBoundaryInspector timeBoundaryInspector,
+ NonBlockingPool<ByteBuffer> bufferPool,
+ @Nullable GroupByQueryMetrics groupByQueryMetrics
+ )
+ {
+ validateForProcess(query, cursorFactory);
+ final CursorBuildSpec buildSpec = makeCursorBuildSpec(query,
groupByQueryMetrics);
+ return processWithCursorHolder(query, cursorFactory, cursorHolder,
timeBoundaryInspector, bufferPool, buildSpec);
+ }
+
+ private static void validateForProcess(GroupByQuery query, @Nullable
CursorFactory cursorFactory)
+ {
if (cursorFactory == null) {
throw new ISE(
"Null cursor factory found. Probably trying to issue a query against
a segment being memory unmapped."
@@ -494,21 +536,35 @@ public class GroupingEngine
if (intervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", intervals);
}
+ }
- final ResourceHolder<ByteBuffer> bufferHolder = bufferPool.take();
+ private Sequence<ResultRow> processWithCursorHolder(
+ GroupByQuery query,
+ CursorFactory cursorFactory,
+ CursorHolder cursorHolder,
+ @Nullable TimeBoundaryInspector timeBoundaryInspector,
+ NonBlockingPool<ByteBuffer> bufferPool,
+ CursorBuildSpec buildSpec
+ )
+ {
+ // Register the cursor holder on the closer before any work that could
throw, so a single catch path covers
+ // every cleanup scenario (bufferPool.take() failure, pipeline
construction failure, etc.) and the original
+ // exception is preserved with any close errors as suppressed.
+ final Closer closer = Closer.create();
+ closer.register(cursorHolder);
- Closer closer = Closer.create();
- closer.register(bufferHolder);
+ final GroupByQueryConfig querySpecificConfig;
try {
- final String fudgeTimestampString =
query.context().getString(GroupingEngine.CTX_KEY_FUDGE_TIMESTAMP);
+ querySpecificConfig = configSupplier.get().withOverrides(query);
+
+ final ResourceHolder<ByteBuffer> bufferHolder = bufferPool.take();
+ closer.register(bufferHolder);
+ final String fudgeTimestampString =
query.context().getString(GroupingEngine.CTX_KEY_FUDGE_TIMESTAMP);
final DateTime fudgeTimestamp = fudgeTimestampString == null
? null
:
DateTimes.utc(Long.parseLong(fudgeTimestampString));
- final CursorBuildSpec buildSpec = makeCursorBuildSpec(query,
groupByQueryMetrics);
- final CursorHolder cursorHolder =
closer.register(cursorFactory.makeCursorHolder(buildSpec));
-
if (cursorHolder.isPreAggregated()) {
query =
query.withAggregatorSpecs(Preconditions.checkNotNull(cursorHolder.getAggregatorsForPreAggregated()));
}
@@ -546,8 +602,7 @@ public class GroupingEngine
return result.withBaggage(closer);
}
catch (Throwable e) {
- CloseableUtils.closeAndWrapExceptions(closer);
- throw e;
+ throw CloseableUtils.closeAndWrapInCatch(e, closer);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java
b/processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java
new file mode 100644
index 00000000000..e682f111619
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Closeable wrapper around an asynchronously-loaded {@link CursorHolder},
returned by
+ * {@link CursorFactory#makeCursorHolderAsync}. Designed to make lifecycle
management safe even when the holder is
+ * still loading: callers receive a single Closeable handle and can register
it once with their cleanup machinery,
+ * regardless of where the underlying load is in its lifecycle.
+ * <p>
+ * The hazard this exists to avoid: returning a {@code
ListenableFuture<CursorHolder>} (or similar future-of-Closeable)
+ * makes correct cleanup error-prone, where canceling the future or letting a
caller fail before consuming the future
+ * can orphan the produced holder, leaking the underlying resources. By
exposing a Closeable that internally tracks the
+ * load and disposes whatever has materialized, callers don't have to write
that bookkeeping themselves.
+ * <p>
+ * <h3>Producer protocol</h3>
+ * Producers feed results in via {@link #set(CursorHolder)} or {@link
#setException(Throwable)}, both of which return
+ * a boolean. If they return {@code false}, this wrapper has already been
closed and the producer is responsible for
+ * closing whatever holder it just produced.
+ * Producers may pass a {@link Runnable} canceler at construction time which
runs on {@link #close()} when the wrapper
+ * is closed before the {@link #set} has been called, giving the producer an
opportunity to abort its work. The canceler
+ * is best-effort: a producer may have already produced the holder by the time
it observes cancellation, in which case
+ * its {@link #set} call will return false and it must close the holder it
tried to set.
+ * <p>
+ * <h3>Consumer protocol</h3>
+ * Consumers wait for {@link #isReady()} via {@link #addReadyCallback}, and
{@link #release()} to transfer ownership of
+ * the {@link CursorHolder} (or throw the producer exception). Calling {@link
#release()} before {@link #isReady()}
+ * returns {@code true}, multiple times, or after this holder has been closed
will throw a {@link DruidException}.
+ * <p>
+ * For example (using {@link ReturnOrAwait} to show intended yield-then-resume
usage pattern):
+ * <pre>{@code
+ * if (asyncHolder == null) {
+ * asyncHolder = cursorFactory.makeCursorHolderAsync(spec);
+ * closer.register(asyncHolder); // safe at any lifecycle point, close()
handles in-flight loads
+ * }
+ * if (!asyncHolder.isReady()) {
+ * SettableFuture<?> awaitFuture = SettableFuture.create();
+ * asyncHolder.addReadyCallback(() -> awaitFuture.set(null));
+ * return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture));
+ * }
+ * final CursorHolder holder = asyncHolder.release(); // ownership transfers
to the caller
+ * // ... use holder; close it when done (or hand it to a component that owns
its lifecycle) ...
+ * }</pre>
+ */
+public class AsyncCursorHolder implements Closeable
+{
+ private static final Logger LOG = new Logger(AsyncCursorHolder.class);
+
+ /**
+ * Completed {@link AsyncCursorHolder} backed by an already available {@link
CursorHolder}
+ */
+ public static AsyncCursorHolder completed(CursorHolder holder)
+ {
+ final AsyncCursorHolder result = new AsyncCursorHolder(null);
+ result.set(holder);
+ return result;
+ }
+
+ @Nullable
+ private final Runnable canceler;
+
+ @GuardedBy("this")
+ @Nullable
+ private CursorHolder result = null;
+ @GuardedBy("this")
+ @Nullable
+ private Throwable error = null;
+ @GuardedBy("this")
+ private boolean closed = false;
+ @GuardedBy("this")
+ private boolean disposed = false;
+ @GuardedBy("this")
+ private final List<Runnable> readyCallbacks = new ArrayList<>();
+
+ /**
+ * @param canceler optional callback invoked from {@link #close()} when the
wrapper is closed before the load has
+ * completed ({@link #set} or {@link #setException}).
Producers that support cancellation should
+ * provide one; producers that don't can pass {@code null},
in which case {@link #close()} just stops
+ * observing the result.
+ */
+ public AsyncCursorHolder(@Nullable Runnable canceler)
+ {
+ this.canceler = canceler;
+ }
+
+ /**
+ * Allows producer to mark the load successful with the given holder.
Returns {@code true} if accepted, {@code false}
+ * if this wrapper has already been closed, in which case the producer is
responsible for closing {@link CursorHolder}
+ * itself. Throws {@link DruidException} if the load was already completed
(from prior calls to this method or
+ * {@link #setException}).
+ * <p>
+ * Callbacks registered via {@link #addReadyCallback} fire outside the lock
to avoid lock-ordering deadlocks and
+ * unbounded lock holds.
+ */
+ public boolean set(CursorHolder holder)
+ {
+ if (holder == null) {
+ throw DruidException.defensive("CursorHolder cannot be null");
+ }
+ return setInternal(Either.value(holder));
+ }
+
+ /**
+ * Allows producer to mark the load as failed. Returns {@code true} if
accepted, {@code false} if this wrapper has
+ * already been closed (no holder was produced, so there's nothing for the
producer to clean up). Throws
+ * {@link DruidException} if the load was already completed (from prior
calls to this method or {@link #set}).
+ * <p>
+ * Callbacks registered via {@link #addReadyCallback} fire outside the lock
to avoid lock-ordering deadlocks and
+ * unbounded lock holds.
+ */
+ public boolean setException(Throwable t)
+ {
+ return setInternal(Either.error(t));
+ }
+
+ private boolean setInternal(Either<Throwable, CursorHolder> value)
+ {
+ final List<Runnable> callbacksToFire;
+ synchronized (this) {
+ if (closed) {
+ return false;
+ }
+ if (result != null || error != null) {
+ throw DruidException.defensive("AsyncCursorHolder is already
completed");
+ }
+ if (value.isError()) {
+ error = value.error();
+ } else {
+ result = value.valueOrThrow();
+ }
+ callbacksToFire = drainCallbacks();
+ }
+ fireCallbacks(callbacksToFire);
+ return true;
+ }
+
+ /**
+ * Whether the load has completed (successfully or with failure). Once true,
stays true. Callers that need to wait
+ * for readiness without blocking the current thread should register a
{@link #addReadyCallback} and yield.
+ */
+ public synchronized boolean isReady()
+ {
+ return result != null || error != null;
+ }
+
+ /**
+ * Take ownership of the underlying {@link CursorHolder}. After this
returns, {@link #close()} on this
+ * {@code AsyncCursorHolder} is a no-op; the caller is responsible for
closing the returned holder. Useful when
+ * passing the holder to another component (e.g. a cursor-lifecycle manager)
that takes ownership of it.
+ * <p>
+ * Throws {@link DruidException} if the holder is not yet ready, has already
been released, or this wrapper
+ * has been closed. Wraps and rethrows the failure if the underlying load
failed. Does not block; callers must
+ * check {@link #isReady()} first (typically by yielding via a {@link
#addReadyCallback}-driven wait pattern).
+ */
+ public synchronized CursorHolder release()
+ {
+ if (closed) {
+ throw DruidException.defensive("AsyncCursorHolder is already closed");
+ }
+ if (disposed) {
+ throw DruidException.defensive("AsyncCursorHolder has already been
released");
+ }
+ if (error != null) {
+ // pass through as is
+ if (error instanceof RuntimeException runtime) {
+ throw runtime;
+ } else if (error instanceof Error e) {
+ throw e;
+ }
+ throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
+ .ofCategory(DruidException.Category.UNCATEGORIZED)
+ .build(error, error.getMessage());
+ }
+ if (result == null) {
+ throw DruidException.defensive("AsyncCursorHolder is not ready yet");
+ }
+ final CursorHolder resultToReturn = result;
+ // clear result so it can be eligible for gc
+ result = null;
+ disposed = true;
+ return resultToReturn;
+ }
+
+ /**
+ * Register a callback to fire when {@link #isReady()} becomes true (whether
the load succeeded or failed). If the
+ * holder is already ready, the callback fires synchronously on the calling
thread. Otherwise it fires on whatever
+ * thread invokes {@link #set} or {@link #setException}, outside the
wrapper's lock so the callback may safely
+ * re-enter the wrapper. Multiple callbacks may be registered; each fires
once.
+ */
+ public void addReadyCallback(Runnable callback)
+ {
+ final boolean fireImmediately;
+ synchronized (this) {
+ if (result != null || error != null) {
+ fireImmediately = true;
+ } else {
+ readyCallbacks.add(callback);
+ fireImmediately = false;
+ }
+ }
+ if (fireImmediately) {
+ callback.run();
+ }
+ }
+
+ /**
+ * Close the wrapper. Safe at any lifecycle point and idempotent:
+ * <ul>
+ * <li>Already-loaded: closes the underlying {@link CursorHolder}
immediately.</li>
+ * <li>Loading in progress: invokes the canceler (if one was supplied at
construction). The producer may still
+ * call {@link #set} / {@link #setException} after this; if the
producer wins the race and calls {@code set}
+ * with a holder, {@code set} returns false and the producer is
responsible for closing it.</li>
+ * <li>Load failed: no-op (no holder was produced).</li>
+ * <li>Already released: no-op.</li>
+ * <li>Already closed: throws {@link DruidException}.</li>
+ * </ul>
+ */
+ @Override
+ public void close()
+ {
+ final CursorHolder holderToClose;
+ final Runnable cancelerToRun;
+ synchronized (this) {
+ if (closed) {
+ throw DruidException.defensive("Already closed");
+ }
+ closed = true;
+ if (disposed) {
+ // Ownership was already transferred via release(); the caller manages
the holder lifecycle.
+ return;
+ }
+ if (result != null) {
+ // Result is here and no one has released it; we close it.
+ disposed = true;
+ holderToClose = result;
+ cancelerToRun = null;
+ } else if (error != null) {
+ // Load already failed; nothing to dispose.
+ holderToClose = null;
+ cancelerToRun = null;
+ } else {
+ // Load not yet completed; signal cancellation to the producer (if
any).
+ holderToClose = null;
+ cancelerToRun = canceler;
+ }
+ }
+ if (holderToClose != null) {
+ try {
+ holderToClose.close();
+ }
+ catch (Throwable ignored) {
+ // Best-effort cleanup
+ }
+ }
+ if (cancelerToRun != null) {
+ try {
+ cancelerToRun.run();
+ }
+ catch (Throwable t) {
+ // Best-effort cancel
+ LOG.warn(t, "AsyncCursorHolder canceler exception");
+ }
+ }
+ }
+
+ @GuardedBy("this")
+ private List<Runnable> drainCallbacks()
+ {
+ final List<Runnable> snapshot = List.copyOf(readyCallbacks);
+ readyCallbacks.clear();
+ return snapshot;
+ }
+
+ private static void fireCallbacks(List<Runnable> callbacks)
+ {
+ for (Runnable cb : callbacks) {
+ try {
+ cb.run();
+ }
+ catch (Throwable t) {
+ // Best-effort; one bad callback shouldn't break others.
+ LOG.warn(t, "AsyncCursorHolder callback exception");
+ }
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
index 2effd1b5fa2..4a79490adc3 100644
--- a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
@@ -32,6 +32,19 @@ public interface CursorFactory extends ColumnInspector
*/
CursorHolder makeCursorHolder(CursorBuildSpec spec);
+ /**
+ * Asynchronous variant of {@link #makeCursorHolder(CursorBuildSpec)} for
cursor factories that may need to do I/O
+ * (e.g. download column data from deep storage) before they can serve a
cursor. Callers running on threads that
+ * must not block should use this.
+ * <p>
+ * The default implementation completes synchronously by delegating to
{@link #makeCursorHolder(CursorBuildSpec)},
+ * which keeps every existing implementation async-correct without changes.
+ */
+ default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+ {
+ return AsyncCursorHolder.completed(makeCursorHolder(spec));
+ }
+
/**
* Returns the {@link RowSignature} of the data available from this cursor
factory. For mutable segments, even though
* the signature may evolve over time, any particular object returned by
this method is an immutable snapshot.
diff --git
a/processing/src/test/java/org/apache/druid/segment/AsyncCursorHolderTest.java
b/processing/src/test/java/org/apache/druid/segment/AsyncCursorHolderTest.java
new file mode 100644
index 00000000000..409f6b6a16b
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/AsyncCursorHolderTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.error.DruidException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+class AsyncCursorHolderTest
+{
+ @Test
+ void testCloseAfterReleaseDoesNotDoubleCloseHolder()
+ {
+ final CountingCursorHolder holder = new CountingCursorHolder();
+ final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(holder);
+
+ final CursorHolder released = asyncHolder.release();
+ Assertions.assertSame(holder, released);
+ Assertions.assertEquals(0, holder.closeCount(), "release should not close
the holder");
+
+ asyncHolder.close();
+ Assertions.assertEquals(0, holder.closeCount(), "close after release must
not double-close the holder");
+ }
+
+ @Test
+ void testCloseWhenAlreadyReadyClosesHolder()
+ {
+ final CountingCursorHolder holder = new CountingCursorHolder();
+ final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(holder);
+
+ asyncHolder.close();
+ Assertions.assertEquals(1, holder.closeCount(), "close should close the
holder when ready and not released");
+ }
+
+ @Test
+ void testCloseMultiple()
+ {
+ final CountingCursorHolder holder = new CountingCursorHolder();
+ final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(holder);
+
+ asyncHolder.close();
+ Throwable t = Assertions.assertThrows(DruidException.class,
asyncHolder::close);
+ Assertions.assertEquals(1, holder.closeCount());
+ Assertions.assertEquals("Already closed", t.getMessage());
+ }
+
+ @Test
+ void testCloseBeforeSetInvokesCancelerAndProducerClosesOrphan()
+ {
+ final AtomicInteger cancelerCalls = new AtomicInteger();
+ final AsyncCursorHolder asyncHolder = new
AsyncCursorHolder(cancelerCalls::incrementAndGet);
+
+ Assertions.assertFalse(asyncHolder.isReady());
+ asyncHolder.close();
+ Assertions.assertEquals(1, cancelerCalls.get(), "close before completion
should invoke the canceler");
+
+ // Producer races and produces a holder anyway: set should return false;
producer must close the orphan.
+ final CountingCursorHolder lateHolder = new CountingCursorHolder();
+ final boolean accepted = asyncHolder.set(lateHolder);
+ Assertions.assertFalse(accepted, "set should be rejected after close");
+ Assertions.assertEquals(0, lateHolder.closeCount(), "wrapper does NOT
close orphan; producer is responsible");
+ }
+
+ @Test
+ void testLateProducerSetAfterCloseClosesOrphan()
+ {
+ // Simulates a producer (e.g. a future-based loader) that delivers a
holder after the wrapper has been closed:
+ // set returns false, the producer notices, and closes the orphan itself.
+ final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+ asyncHolder.close();
+
+ final CountingCursorHolder lateHolder = new CountingCursorHolder();
+ final boolean accepted = asyncHolder.set(lateHolder);
+ Assertions.assertFalse(accepted, "set should be rejected after close");
+ // (Producer-side responsibility — wrapper does not close on the
producer's behalf.)
+ Assertions.assertEquals(0, lateHolder.closeCount(), "wrapper does NOT
close orphan; producer is responsible");
+ }
+
+ @Test
+ void testNoCancelerIsCalledIfLoadAlreadyCompleted()
+ {
+ final AtomicInteger cancelerCalls = new AtomicInteger();
+ final AsyncCursorHolder asyncHolder = new
AsyncCursorHolder(cancelerCalls::incrementAndGet);
+ asyncHolder.set(new CountingCursorHolder());
+
+ asyncHolder.close();
+ Assertions.assertEquals(0, cancelerCalls.get(), "canceler must not run if
the load already completed");
+ }
+
+ @Test
+ void testReleaseBeforeReadyThrows()
+ {
+ final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+ Assertions.assertThrows(DruidException.class, asyncHolder::release);
+ }
+
+ @Test
+ void testReleaseAfterReleaseThrows()
+ {
+ final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(new
CountingCursorHolder());
+ asyncHolder.release();
+ Assertions.assertThrows(DruidException.class, asyncHolder::release);
+ }
+
+ @Test
+ void testReleaseAfterCloseThrows()
+ {
+ final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(new
CountingCursorHolder());
+ asyncHolder.close();
+ Assertions.assertThrows(DruidException.class, asyncHolder::release);
+ }
+
+ @Test
+ void testReleaseAfterFailedLoadThrowsWrappedFailure()
+ {
+ final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+ final IllegalArgumentException failure = new
IllegalArgumentException("boom");
+ asyncHolder.setException(failure);
+
+ final IllegalArgumentException thrown =
Assertions.assertThrows(IllegalArgumentException.class, asyncHolder::release);
+ Assertions.assertSame(failure, thrown, "release should propagate the
underlying failure");
+ }
+
+ @Test
+ void testSetAfterSetThrows()
+ {
+ final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+ asyncHolder.set(new CountingCursorHolder());
+ Assertions.assertThrows(DruidException.class, () -> asyncHolder.set(new
CountingCursorHolder()));
+ }
+
+ @Test
+ void testSetAfterSetExceptionThrows()
+ {
+ final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+ asyncHolder.setException(new RuntimeException("boom"));
+ Assertions.assertThrows(DruidException.class, () -> asyncHolder.set(new
CountingCursorHolder()));
+ }
+
+ @Test
+ void testAddReadyCallbackFiresImmediatelyWhenAlreadyReady()
+ {
+ final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(new
CountingCursorHolder());
+ final AtomicInteger fired = new AtomicInteger();
+ asyncHolder.addReadyCallback(fired::incrementAndGet);
+ Assertions.assertEquals(1, fired.get(), "callback should fire
synchronously when already ready");
+ }
+
+ @Test
+ void testAddReadyCallbackFiresOnSet()
+ {
+ final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+ final AtomicInteger fired = new AtomicInteger();
+ asyncHolder.addReadyCallback(fired::incrementAndGet);
+ Assertions.assertEquals(0, fired.get(), "callback should not fire before
completion");
+
+ asyncHolder.set(new CountingCursorHolder());
+ Assertions.assertEquals(1, fired.get(), "callback should fire when set is
called");
+ }
+
+ @Test
+ void testAddReadyCallbackFiresOnSetException()
+ {
+ final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+ final AtomicInteger fired = new AtomicInteger();
+ asyncHolder.addReadyCallback(fired::incrementAndGet);
+ Assertions.assertEquals(0, fired.get(), "callback should not fire before
completion");
+
+ asyncHolder.setException(new RuntimeException("boom"));
+ Assertions.assertEquals(1, fired.get(), "callback should fire when
setException is called");
+ }
+
+ @Test
+ void testMultipleCallbacksAllFire()
+ {
+ final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+ final AtomicInteger fired = new AtomicInteger();
+ asyncHolder.addReadyCallback(fired::incrementAndGet);
+ asyncHolder.addReadyCallback(fired::incrementAndGet);
+ asyncHolder.addReadyCallback(fired::incrementAndGet);
+
+ asyncHolder.set(new CountingCursorHolder());
+ Assertions.assertEquals(3, fired.get(), "all registered callbacks should
fire");
+ }
+
+ /**
+ * Minimal {@link CursorHolder} that just counts close invocations. Other
methods are unimplemented because the
+ * tests don't exercise them.
+ */
+ private static class CountingCursorHolder implements CursorHolder
+ {
+ private int closeCount;
+
+ int closeCount()
+ {
+ return closeCount;
+ }
+
+ @Override
+ public Cursor asCursor()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close()
+ {
+ closeCount++;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]