This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7f9e81b8c07 MSQ: Push down limits in ScanQueryFrameProcessor. (#18441)
7f9e81b8c07 is described below
commit 7f9e81b8c0777c238eb51db039cc442ef74c5631
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Sep 5 15:31:02 2025 -0700
MSQ: Push down limits in ScanQueryFrameProcessor. (#18441)
When the scan query sorts by something that matches the underlying
cursor, stop reading early.
---
.../msq/querykit/scan/ScanQueryFrameProcessor.java | 106 +++++++++++++++++----
1 file changed, 87 insertions(+), 19 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index f000b13208d..a17c38cbf6b 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -64,6 +64,7 @@ import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.query.Druids;
import org.apache.druid.query.IterableRowsCursorHelper;
import org.apache.druid.query.Order;
+import org.apache.druid.query.OrderBy;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanResultValue;
@@ -101,8 +102,14 @@ import java.util.stream.Collectors;
public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
{
private static final Logger log = new Logger(ScanQueryFrameProcessor.class);
+ private static final int NO_LIMIT = -1;
+
private final ScanQuery query;
- private final AtomicLong runningCountForLimit;
+ /**
+ * Running count of rows emitted, shared across all processors generated by
the same {@link ScanQueryStageProcessor}.
+ * Only set when {@link ScanQuery#getOrderBys()} is empty.
+ */
+ private final AtomicLong sharedRunningCountForLimit;
private final ObjectMapper jsonMapper;
private final SettableLongVirtualColumn partitionBoostVirtualColumn;
private final VirtualColumns frameWriterVirtualColumns;
@@ -117,9 +124,19 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
private long currentAllocatorCapacity; // Used for generating
FrameRowTooLargeException if needed
private SegmentsInputSlice handedOffSegments = null;
+ /**
+ * Number of rows read so far from the current cursor.
+ */
+ private long cursorRowsRead;
+
+ /**
+ * Limit to be pushed down into the current cursor, or a negative number if
no limit is pushed down.
+ */
+ private long cursorPushDownLimit = NO_LIMIT;
+
public ScanQueryFrameProcessor(
final ScanQuery query,
- @Nullable final AtomicLong runningCountForLimit,
+ @Nullable final AtomicLong sharedRunningCountForLimit,
final ObjectMapper jsonMapper,
final ReadableInput baseInput,
final SegmentMapFunction segmentMapFn,
@@ -134,7 +151,7 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
frameWriterFactoryHolder
);
this.query = query;
- this.runningCountForLimit = runningCountForLimit;
+ this.sharedRunningCountForLimit = sharedRunningCountForLimit;
this.jsonMapper = jsonMapper;
this.partitionBoostVirtualColumn = new
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
@@ -154,8 +171,12 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
@Override
public ReturnOrAwait<Object> runIncrementally(final IntSet readableInputs)
throws IOException
{
- if (runningCountForLimit != null
- && runningCountForLimit.get() > query.getScanRowsOffset() +
query.getScanRowsLimit()) {
+ if (sharedRunningCountForLimit != null
+ && sharedRunningCountForLimit.get() >= getQueryOffsetPlusLimit()) {
+ return ReturnOrAwait.returnObject(Unit.instance());
+ }
+
+ if ((cursorPushDownLimit >= 0 && cursorRowsRead >= cursorPushDownLimit)) {
return ReturnOrAwait.returnObject(Unit.instance());
}
@@ -244,7 +265,7 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
cursorYielder.close();
return ReturnOrAwait.returnObject(handedOffSegments);
} else {
- final long rowsFlushed = setNextCursor(cursorYielder.get(), null,
null);
+ final long rowsFlushed = setNextCursor(cursorYielder.get(),
Collections.emptyList(), null, null);
closer.register(cursorYielder);
if (rowsFlushed > 0) {
return ReturnOrAwait.runAgain();
@@ -307,7 +328,12 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
nextCursorHolder.close();
return ReturnOrAwait.returnObject(Unit.instance());
} else {
- final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder,
segmentHolder.get().getSegment());
+ final long rowsFlushed = setNextCursor(
+ nextCursor,
+ nextCursorHolder.getOrdering(),
+ nextCursorHolder,
+ segmentHolder.get().getSegment()
+ );
assert rowsFlushed == 0; // There's only ever one cursor when running
with a segment
}
}
@@ -359,7 +385,12 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
nextCursorHolder.close();
return ReturnOrAwait.returnObject(Unit.instance());
}
- final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder,
frameSegment);
+ final long rowsFlushed = setNextCursor(
+ nextCursor,
+ nextCursorHolder.getOrdering(),
+ nextCursorHolder,
+ frameSegment
+ );
if (rowsFlushed > 0) {
return ReturnOrAwait.runAgain();
@@ -414,23 +445,34 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
createFrameWriterIfNeeded();
while (!cursor.isDone()) {
- if (!frameWriter.addSelection()) {
- if (frameWriter.getNumRows() > 0) {
- final long numRowsWritten = flushFrameWriter();
+ boolean flush;
- if (runningCountForLimit != null) {
- runningCountForLimit.addAndGet(numRowsWritten);
- }
+ if (frameWriter.addSelection()) {
+ cursorRowsRead++;
+ cursor.advance();
+ cursorOffset.increment();
+
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() +
1);
- return;
- } else {
+ // Flush if we reached cursorPushDownLimit.
+ flush = cursorPushDownLimit >= 0 && cursorRowsRead >=
cursorPushDownLimit;
+ } else {
+ // addSelection failed because the frame is full.
+ if (frameWriter.getNumRows() == 0) {
throw new FrameRowTooLargeException(currentAllocatorCapacity);
}
+
+ flush = true;
}
- cursor.advance();
- cursorOffset.increment();
-
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() +
1);
+ if (flush) {
+ final long numRowsWritten = flushFrameWriter();
+
+ if (sharedRunningCountForLimit != null) {
+ sharedRunningCountForLimit.addAndGet(numRowsWritten);
+ }
+
+ break;
+ }
}
}
@@ -465,6 +507,7 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
private long setNextCursor(
final Cursor cursor,
+ final List<OrderBy> ordering,
@Nullable final Closeable cursorCloser,
final Segment segment
) throws IOException
@@ -479,6 +522,16 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
this.cursorCloser = cursorCloser;
this.segment = segment;
this.cursorOffset.reset();
+ this.cursorRowsRead = 0;
+
+ if (query.isLimited()
+ && ordering.size() >= query.getOrderBys().size()
+ && query.getOrderBys().equals(ordering.subList(0,
query.getOrderBys().size()))) {
+ cursorPushDownLimit = getQueryOffsetPlusLimit();
+ } else {
+ cursorPushDownLimit = NO_LIMIT;
+ }
+
return rowsFlushed;
}
@@ -497,4 +550,19 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
}
return baseColumnSelectorFactory;
}
+
+ /**
+ * Returns the {@link ScanQuery#getScanRowsOffset()} plus {@link
ScanQuery#getScanRowsLimit()}, or
+ * {@link Long#MAX_VALUE} if that addition would overflow.
+ */
+ private long getQueryOffsetPlusLimit()
+ {
+ final long sum = query.getScanRowsOffset() + query.getScanRowsLimit();
+ if (sum < 0) {
+ // Overflow
+ return Long.MAX_VALUE;
+ } else {
+ return sum;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]