This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 14059b375eb Apply min/max segment pruning to filtered selection ORDER
BY col LIMIT n (#18692)
14059b375eb is described below
commit 14059b375eb2743ed7d9340364b7c5cc51e7d058
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Jun 10 13:04:49 2026 +0200
Apply min/max segment pruning to filtered selection ORDER BY col LIMIT n
(#18692)
---
.../query/pruner/SelectionQuerySegmentPruner.java | 239 ++++++++++++++++-
.../pruner/SelectionQuerySegmentPrunerTest.java | 286 +++++++++++++++++++++
.../BenchmarkSelectionOrderByFilterPruning.java | 210 +++++++++++++++
3 files changed, 725 insertions(+), 10 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
index 6f2e29a2240..993050d7898 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
@@ -23,11 +23,20 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.request.context.predicate.EqPredicate;
+import org.apache.pinot.common.request.context.predicate.NotEqPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.common.request.context.predicate.RangePredicate;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -37,8 +46,14 @@ import org.apache.pinot.spi.env.PinotConfiguration;
* <li>For selection query with LIMIT 0, keep 1 segment to create the data
schema</li>
* <li>For selection only query without filter, keep enough documents to
fulfill the LIMIT requirement</li>
* <li>
- * For selection order-by query without filer, if the first order-by
expression is an identifier (column), prune
- * segments based on the column min/max value and keep enough documents to
fulfill the LIMIT and OFFSET requirement.
+ * For selection order-by query, if the first order-by expression is an
identifier (column), prune segments based on
+ * the column min/max value and keep enough documents to fulfill the LIMIT
and OFFSET requirement. This works both
+ * without a filter and with a filter: with a filter, each segment
contributes towards the LIMIT only the number of
+ * rows that <em>provably</em> match the filter based on min/max metadata
(its total docs if it fully matches, 0
+ * otherwise). Using this lower bound on matching rows keeps the boundary
safe, so segments are never pruned when
+ * they might still hold a top-n matching row. The optimization is skipped
when null handling is active for the
+ * order-by/predicate columns because nulls are stored as a default value
that pollutes the min/max metadata
+ * (see #18685).
* </li>
* </ul>
*/
@@ -51,10 +66,24 @@ public class SelectionQuerySegmentPruner implements
SegmentPruner {
@Override
public boolean isApplicableTo(QueryContext query) {
- // Only prune selection queries
- // If LIMIT is not 0, only prune segments for selection queries without
filter
- return QueryContextUtils.isSelectionQuery(query)
- && (query.getFilter() == null || query.getLimit() == 0);
+ if (!QueryContextUtils.isSelectionQuery(query)) {
+ return false;
+ }
+ // Without a filter (or for LIMIT 0, where we just keep one segment for
the schema), pruning is always applicable.
+ if (query.getFilter() == null || query.getLimit() == 0) {
+ return true;
+ }
+ // With a filter, only the order-by-on-identifier path can prune safely
(the selection-only path relies on exact
+ // doc counts, which a filter invalidates). Additionally, null handling
must not be active for the order-by column,
+ // because nulls are stored as a default value that pollutes the column
min/max metadata used for sorting and the
+ // boundary.
+ List<OrderByExpressionContext> orderByExpressions =
query.getOrderByExpressions();
+ if (orderByExpressions == null) {
+ return false;
+ }
+ ExpressionContext firstOrderByExpression =
orderByExpressions.get(0).getExpression();
+ return firstOrderByExpression.getType() ==
ExpressionContext.Type.IDENTIFIER
+ && !isNullHandlingActive(query,
firstOrderByExpression.getIdentifier());
}
@Override
@@ -75,7 +104,9 @@ public class SelectionQuerySegmentPruner implements
SegmentPruner {
}
if (query.getOrderByExpressions() == null) {
- return pruneSelectionOnly(segments, query);
+ // Count-based selection-only pruning is only safe without a filter
(total docs is an exact match count). With a
+ // filter present this path is not selected (see isApplicableTo); guard
defensively in case it is reached.
+ return query.getFilter() == null ? pruneSelectionOnly(segments, query) :
segments;
} else {
return pruneSelectionOrderBy(segments, query);
}
@@ -100,7 +131,7 @@ public class SelectionQuerySegmentPruner implements
SegmentPruner {
}
/**
- * Helper method to prune segments for selection order-by queries without
filter.
+ * Helper method to prune segments for selection order-by queries.
* <p>When the first order-by expression is an identifier (column), we can
prune segments based on the column min/max
* value:
* <ul>
@@ -108,6 +139,8 @@ public class SelectionQuerySegmentPruner implements
SegmentPruner {
* <li>2. Pick the top segments until we get enough documents to fulfill
the LIMIT and OFFSET requirement</li>
* <li>3. Keep the segments that has value overlap with the selected ones;
remove the others</li>
* </ul>
+ * <p>Each segment contributes towards the LIMIT only its {@link
#guaranteedMatchingDocs} (a lower bound on its
+ * matching rows), so the optimization remains correct when a filter is
present.
*/
private List<IndexSegment> pruneSelectionOrderBy(List<IndexSegment>
segments, QueryContext query) {
List<OrderByExpressionContext> orderByExpressions =
query.getOrderByExpressions();
@@ -157,7 +190,7 @@ public class SelectionQuerySegmentPruner implements
SegmentPruner {
IndexSegment segment = segments.get(minMaxValue._index);
if (remainingDocs > 0) {
selectedSegments.add(segment);
- remainingDocs -= segment.getSegmentMetadata().getTotalDocs();
+ remainingDocs -= guaranteedMatchingDocs(segment, query);
maxValue = minMaxValue._maxValue;
} else {
// After getting enough documents, prune all the segments with min
value larger than the current max value, or
@@ -184,7 +217,7 @@ public class SelectionQuerySegmentPruner implements
SegmentPruner {
IndexSegment segment = segments.get(minMaxValue._index);
if (remainingDocs > 0) {
selectedSegments.add(segment);
- remainingDocs -= segment.getSegmentMetadata().getTotalDocs();
+ remainingDocs -= guaranteedMatchingDocs(segment, query);
minValue = minMaxValue._minValue;
} else {
// After getting enough documents, prune all the segments with max
value smaller than the current min value,
@@ -201,6 +234,192 @@ public class SelectionQuerySegmentPruner implements
SegmentPruner {
return selectedSegments;
}
+ /**
+ * Returns a lower bound on the number of rows in the segment that match the
query filter, used to decide how many
+ * leading segments must be kept to guarantee the LIMIT + OFFSET requirement.
+ * <ul>
+ * <li>Without a filter, every row matches, so this is the exact total doc
count.</li>
+ * <li>With a filter, this is the total doc count if the segment
<em>provably</em> matches the filter for all of its
+ * rows (based on min/max metadata), and 0 otherwise. Using 0 for segments
that only partially (or not provably
+ * fully) match is a safe under-count: such segments are still kept (they
overlap the boundary), but they never let
+ * the boundary advance past rows they might contain.</li>
+ * </ul>
+ */
+ private long guaranteedMatchingDocs(IndexSegment segment, QueryContext
query) {
+ int totalDocs = segment.getSegmentMetadata().getTotalDocs();
+ FilterContext filter = query.getFilter();
+ if (filter == null) {
+ return totalDocs;
+ }
+ return fullyMatches(segment, filter, query) ? totalDocs : 0;
+ }
+
+ /**
+ * Returns {@code true} only if <em>all</em> rows of the segment provably
satisfy the filter, based on min/max
+ * metadata. A {@code false} result never means "does not match"; it means
"cannot prove that all rows match", which
+ * is always safe to treat as a 0 lower bound. NOT and unsupported
predicates conservatively return {@code false}.
+ */
+ private boolean fullyMatches(IndexSegment segment, FilterContext filter,
QueryContext query) {
+ switch (filter.getType()) {
+ case AND:
+ assert filter.getChildren() != null;
+ for (FilterContext child : filter.getChildren()) {
+ if (!fullyMatches(segment, child, query)) {
+ return false;
+ }
+ }
+ return true;
+ case OR:
+ assert filter.getChildren() != null;
+ for (FilterContext child : filter.getChildren()) {
+ if (fullyMatches(segment, child, query)) {
+ return true;
+ }
+ }
+ return false;
+ case CONSTANT:
+ return filter.isConstantTrue();
+ case PREDICATE:
+ return predicateFullyMatches(segment, filter.getPredicate(), query);
+ case NOT:
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Returns {@code true} only if all rows of the segment provably satisfy the
predicate, based on the predicate
+ * column's min/max metadata. Only identifier predicates on non-nullable
columns of types {@code RANGE} (e.g.
+ * {@code >, >=, <, <=}), {@code EQ} ({@code =}) and {@code NOT_EQ} ({@code
<>}) are supported; everything else
+ * conservatively returns {@code false}.
+ */
+ private boolean predicateFullyMatches(IndexSegment segment, Predicate
predicate, QueryContext query) {
+ ExpressionContext lhs = predicate.getLhs();
+ if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
+ return false;
+ }
+ String column = lhs.getIdentifier();
+ // Nulls are stored as a default value that pollutes the column min/max
metadata (and are excluded from comparisons
+ // under null handling), so full-match cannot be reasoned about when null
handling is active for the column.
+ if (isNullHandlingActive(query, column)) {
+ return false;
+ }
+ DataSource dataSource = segment.getDataSource(column, query.getSchema());
+ DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
+ Comparable minValue = dataSourceMetadata.getMinValue();
+ Comparable maxValue = dataSourceMetadata.getMaxValue();
+ if (minValue == null || maxValue == null) {
+ return false;
+ }
+ // NaN (FLOAT/DOUBLE) breaks the ordering assumptions:
Float/Double.compareTo treats NaN as the largest value, which
+ // does not match filter semantics (comparisons against NaN are never
true). A NaN min/max could make a segment look
+ // fully-matching when its NaN rows actually do not match, so refuse to
reason about full match in that case.
+ if (isNaN(minValue) || isNaN(maxValue)) {
+ return false;
+ }
+ DataType dataType = dataSourceMetadata.getDataType();
+ try {
+ switch (predicate.getType()) {
+ case RANGE:
+ return rangeFullyMatches((RangePredicate) predicate, minValue,
maxValue, dataType);
+ case EQ: {
+ Comparable value = convertValue(((EqPredicate)
predicate).getValue(), dataType);
+ // All rows equal the value iff the whole segment collapses to that
single value.
+ return minValue.compareTo(value) == 0 && maxValue.compareTo(value)
== 0;
+ }
+ case NOT_EQ: {
+ Comparable value = convertValue(((NotEqPredicate)
predicate).getValue(), dataType);
+ // All rows differ from the value iff the value lies outside [min,
max].
+ return value.compareTo(minValue) < 0 || value.compareTo(maxValue) >
0;
+ }
+ default:
+ return false;
+ }
+ } catch (Exception e) {
+ // Different data types / unparseable literal: cannot prove full match.
+ return false;
+ }
+ }
+
+ /**
+ * Returns {@code true} if the segment's whole {@code [minValue, maxValue]}
range provably satisfies the range
+ * predicate (i.e. it is fully contained within the predicate's bounds).
+ */
+ private boolean rangeFullyMatches(RangePredicate predicate, Comparable
minValue, Comparable maxValue,
+ DataType dataType) {
+ String lowerBound = predicate.getLowerBound();
+ if (!lowerBound.equals(RangePredicate.UNBOUNDED)) {
+ Comparable lowerBoundValue = convertValue(lowerBound, dataType);
+ if (predicate.isLowerInclusive()) {
+ if (minValue.compareTo(lowerBoundValue) < 0) {
+ return false;
+ }
+ } else {
+ if (minValue.compareTo(lowerBoundValue) <= 0) {
+ return false;
+ }
+ }
+ }
+ String upperBound = predicate.getUpperBound();
+ if (!upperBound.equals(RangePredicate.UNBOUNDED)) {
+ Comparable upperBoundValue = convertValue(upperBound, dataType);
+ if (predicate.isUpperInclusive()) {
+ if (maxValue.compareTo(upperBoundValue) > 0) {
+ return false;
+ }
+ } else {
+ if (maxValue.compareTo(upperBoundValue) >= 0) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns whether null handling is active for the column, in which case
this optimization must be skipped. Pinot
+ * stores nulls as a default value that pollutes the column min/max metadata
and the total doc count; only when null
+ * handling is enabled are those null rows excluded from comparisons, which
is when the pollution becomes unsafe to
+ * reason about (with null handling disabled, nulls are simply the default
value and the min/max stay accurate). This
+ * mirrors the null caution in {@code SelectionPlanNode#isSorted}.
+ * <p>A column carries null semantics only when null handling is enabled for
the query <b>and</b> the column is
+ * nullable. Nullability follows the same resolution used at segment build
time (e.g.
+ * {@code BaseSegmentCreator#isNullable}): under column-based null handling
the per-column
+ * {@link FieldSpec#isNullable} flag, otherwise (table/query-level null
handling) all columns are nullable. The check
+ * is conservative: an unknown schema or column is treated as
null-handling-active.
+ */
+ private static boolean isNullHandlingActive(QueryContext query, String
column) {
+ if (!query.isNullHandlingEnabled()) {
+ // Null semantics are off: nulls are just the default value, so min/max
and doc counts stay accurate.
+ return false;
+ }
+ Schema schema = query.getSchema();
+ if (schema == null) {
+ return true;
+ }
+ if (schema.isEnableColumnBasedNullHandling()) {
+ // Column-based null handling: only nullable columns carry nulls.
+ FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+ return fieldSpec == null || fieldSpec.isNullable();
+ }
+ // Table/query-level (legacy) null handling applies null semantics to all
columns.
+ return true;
+ }
+
+ private static boolean isNaN(Comparable value) {
+ return (value instanceof Double && ((Double) value).isNaN())
+ || (value instanceof Float && ((Float) value).isNaN());
+ }
+
+ /**
+ * Converts a predicate literal to the column's stored type. Any parse
failure propagates to the caller, which treats
+ * it as "cannot prove full match" (this pruner must stay conservative; an
actually invalid query is rejected by the
+ * preceding {@link ColumnValueSegmentPruner} or by query execution).
+ */
+ private static Comparable convertValue(String stringValue, DataType
dataType) {
+ return dataType.convertInternal(stringValue);
+ }
+
private static class MinMaxValue {
final int _index;
final Comparable _minValue;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
index 2f65ef11af8..fc1052325db 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.testng.annotations.Test;
@@ -37,6 +38,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
@@ -44,6 +46,15 @@ import static org.testng.Assert.assertTrue;
public class SelectionQuerySegmentPrunerTest {
public static final String ORDER_BY_COLUMN = "testColumn";
+ public static final String FILTER_COLUMN = "foo";
+
+ // Schema with a LONG order-by column and a STRING column, used by the
filter-aware tests. Null handling is off, so
+ // these columns are not treated as null-handling-active even though they
are nullable by default.
+ private static final Schema SCHEMA = new Schema.SchemaBuilder()
+ .addSingleValueDimension(ORDER_BY_COLUMN, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(FILTER_COLUMN, FieldSpec.DataType.STRING)
+ .build();
+
private final SelectionQuerySegmentPruner _segmentPruner = new
SelectionQuerySegmentPruner();
@Test
@@ -211,12 +222,279 @@ public class SelectionQuerySegmentPrunerTest {
assertEquals(result.size(), 3);
}
+ /**
+ * Range-partitioned, non-overlapping segments on {@code testColumn}: [0,9],
[10,19], [20,29], [30,39], [40,49].
+ */
+ private List<IndexSegment> rangePartitionedSegments() {
+ return Arrays.asList(
+ getIndexSegment(0L, 9L, 10), // 0
+ getIndexSegment(10L, 19L, 10), // 1
+ getIndexSegment(20L, 29L, 10), // 2
+ getIndexSegment(30L, 39L, 10), // 3
+ getIndexSegment(40L, 49L, 10)); // 4
+ }
+
+ private List<IndexSegment> prune(List<IndexSegment> segments, String query) {
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(query);
+ queryContext.setSchema(SCHEMA);
+ return _segmentPruner.prune(segments, queryContext);
+ }
+
+ @Test
+ public void testSelectionOrderByWithFilterDesc() {
+ List<IndexSegment> segments = rangePartitionedSegments();
+ // DESC, want largest 5 with col > 25: all live in [40,49]. Only the top
fully-matching segment is kept.
+ List<IndexSegment> result =
+ prune(segments, "SELECT * FROM testTable WHERE testColumn > 25 ORDER
BY testColumn DESC LIMIT 5");
+ assertEquals(result, List.of(segments.get(4)));
+
+ // >= behaves the same here (min 40 >= 30).
+ result = prune(segments, "SELECT * FROM testTable WHERE testColumn >= 30
ORDER BY testColumn DESC LIMIT 5");
+ assertEquals(result, List.of(segments.get(4)));
+ }
+
+ @Test
+ public void testSelectionOrderByWithFilterAsc() {
+ List<IndexSegment> segments = rangePartitionedSegments();
+ // ASC, want smallest 15 with col < 25: live in [0,9] (0-9) and [10,19]
(10-14). Higher segments are pruned.
+ List<IndexSegment> result =
+ prune(segments, "SELECT * FROM testTable WHERE testColumn < 25 ORDER
BY testColumn LIMIT 15");
+ assertEquals(result, List.of(segments.get(0), segments.get(1)));
+
+ // <= behaves the same here (max 19 <= 24).
+ result = prune(segments, "SELECT * FROM testTable WHERE testColumn <= 24
ORDER BY testColumn LIMIT 15");
+ assertEquals(result, List.of(segments.get(0), segments.get(1)));
+ }
+
+ @Test
+ public void testFilterStraddlingSegmentIsKept() {
+ // Correctness counterexample for the lower-bound rule: DESC, col < 35,
LIMIT 5. The answer (34..30) lives in the
+ // straddling segment [30,39], which is counted as 0 matching docs. A
naive getTotalDocs() accumulation would let
+ // the boundary advance past it and prune it; the lower-bound rule must
keep it.
+ List<IndexSegment> segments = rangePartitionedSegments();
+ List<IndexSegment> result =
+ prune(segments, "SELECT * FROM testTable WHERE testColumn < 35 ORDER
BY testColumn DESC LIMIT 5");
+ assertTrue(result.contains(segments.get(3)), "straddling segment [30,39]
holding the top-n must be kept");
+ }
+
+ @Test
+ public void testFilterEqAndNotEq() {
+ // EQ fully matches only a single-valued segment; here add one collapsed
to 25.
+ List<IndexSegment> segments = Arrays.asList(
+ getIndexSegment(0L, 9L, 10), // 0
+ getIndexSegment(25L, 25L, 10), // 1 (single value 25)
+ getIndexSegment(40L, 49L, 10)); // 2
+ // col = 25 ORDER BY col DESC LIMIT 5 -> only [25,25] fully matches;
[40,49] (max 49 >= 25 boundary) pruned.
+ List<IndexSegment> result =
+ prune(segments, "SELECT * FROM testTable WHERE testColumn = 25 ORDER
BY testColumn DESC LIMIT 5");
+ assertTrue(result.contains(segments.get(1)));
+
+ // col <> 25 DESC LIMIT 5 -> largest != 25 live in [40,49]; that segment
fully matches (25 outside [40,49]).
+ result = prune(segments, "SELECT * FROM testTable WHERE testColumn <> 25
ORDER BY testColumn DESC LIMIT 5");
+ assertEquals(result, List.of(segments.get(2)));
+ }
+
+ @Test
+ public void testAndConjunctOnNonProvableColumnDisablesPruning() {
+ // col > 25 is provably-full on the high segments, but the ANDed predicate
on an unanalyzable column ('foo' is not
+ // in the schema) cannot be proven full for any segment -> 0 lower bound
everywhere -> nothing is pruned (safe).
+ List<IndexSegment> segments = rangePartitionedSegments();
+ List<IndexSegment> result = prune(segments,
+ "SELECT * FROM testTable WHERE testColumn > 25 AND foo = 'x' ORDER BY
testColumn DESC LIMIT 5");
+ assertEquals(result.size(), segments.size());
+ }
+
+ @Test
+ public void testOrFilterFullMatch() {
+ // LIMIT exceeds the total doc count (5 segments * 10 docs), so
guaranteedMatchingDocs() -> fullyMatches() is
+ // evaluated for every segment: the OR is fully matched when ANY child is
(seg [30,39], [40,49] via testColumn > 25)
+ // and not matched otherwise. No segment can be pruned (the limit needs
them all), so the result is unchanged.
+ List<IndexSegment> segments = rangePartitionedSegments();
+ List<IndexSegment> result = prune(segments,
+ "SELECT * FROM testTable WHERE testColumn > 25 OR testColumn > 1000
ORDER BY testColumn LIMIT 100");
+ assertEquals(result.size(), segments.size());
+ }
+
+ @Test
+ public void testFilterNotProvablyFullCases() {
+ // Each of these filters cannot be proven full-match from min/max
metadata, so every segment contributes 0 and
+ // (with a LIMIT larger than the data) nothing is pruned. They exercise
the conservative fall-through branches.
+ List<IndexSegment> segments = rangePartitionedSegments();
+ // Predicate on a non-identifier (function) expression.
+ assertEquals(prune(segments,
+ "SELECT * FROM testTable WHERE testColumn + 1 > 5 ORDER BY testColumn
LIMIT 100").size(), segments.size());
+ // Unsupported predicate type (IN) for the full-match analysis.
+ assertEquals(prune(segments,
+ "SELECT * FROM testTable WHERE testColumn IN (1, 2, 3) ORDER BY
testColumn LIMIT 100").size(), segments.size());
+ // NOT filter is never treated as full-match.
+ assertEquals(prune(segments,
+ "SELECT * FROM testTable WHERE NOT (testColumn IN (1, 2, 3)) ORDER BY
testColumn LIMIT 100").size(),
+ segments.size());
+ // Bound larger than any value: no segment is fully contained, so nothing
is pruned.
+ assertEquals(prune(segments,
+ "SELECT * FROM testTable WHERE testColumn > 99999999999999999999999
ORDER BY testColumn LIMIT 100").size(),
+ segments.size());
+ }
+
+ @Test
+ public void testAndAllChildrenFullMatch() {
+ // AND where every child is provably full on the top segment -> AND
returns true and the set is limit-pruned.
+ List<IndexSegment> segments = rangePartitionedSegments();
+ List<IndexSegment> result = prune(segments,
+ "SELECT * FROM testTable WHERE testColumn > 25 AND testColumn < 1000
ORDER BY testColumn DESC LIMIT 5");
+ assertEquals(result, List.of(segments.get(4)));
+ }
+
+ @Test
+ public void testRangeInclusiveBoundsNotFull() {
+ // Inclusive bounds where some processed segments are not fully contained,
exercising the inclusive "not full"
+ // branches. LIMIT exceeds the data so nothing is pruned.
+ List<IndexSegment> segments = rangePartitionedSegments();
+ assertEquals(prune(segments,
+ "SELECT * FROM testTable WHERE testColumn >= 25 ORDER BY testColumn
LIMIT 100").size(), segments.size());
+ assertEquals(prune(segments,
+ "SELECT * FROM testTable WHERE testColumn <= 25 ORDER BY testColumn
LIMIT 100").size(), segments.size());
+ }
+
+ @Test
+ public void testPruneCalledWithNullHandlingActive() {
+ // prune() invoked directly (bypassing isApplicableTo) with null handling
on: the predicate column is
+ // null-handling-active, so it is never provably full and nothing is
pruned.
+ List<IndexSegment> segments = rangePartitionedSegments();
+ List<IndexSegment> result = prune(segments, "SET enableNullHandling=true; "
+ + "SELECT * FROM testTable WHERE testColumn > 25 ORDER BY testColumn
DESC LIMIT 100");
+ assertEquals(result.size(), segments.size());
+ }
+
+ @Test
+ public void testFilterPredicateColumnWithoutMinMax() {
+ // The filter column has no min/max metadata, so it cannot be proven
full-match -> 0 contribution, nothing pruned.
+ List<IndexSegment> segments = Arrays.asList(
+ getIndexSegment(0L, 9L, 10, false, null, null),
+ getIndexSegment(10L, 19L, 10, false, null, null));
+ List<IndexSegment> result =
+ prune(segments, "SELECT * FROM testTable WHERE foo >= 'a' ORDER BY
testColumn DESC LIMIT 5");
+ assertEquals(result.size(), segments.size());
+ }
+
+ @Test
+ public void testFilteredSelectionOnlyNotPruned() {
+ // Selection-only (no ORDER BY) with a filter: count-based pruning is
unsafe, so prune() keeps every segment.
+ List<IndexSegment> segments = rangePartitionedSegments();
+ List<IndexSegment> result = prune(segments, "SELECT * FROM testTable WHERE
testColumn > 25 LIMIT 5");
+ assertEquals(result.size(), segments.size());
+ }
+
+ @Test
+ public void testFilterWithOffset() {
+ List<IndexSegment> segments = rangePartitionedSegments();
+ // DESC, col > 25, LIMIT 5 OFFSET 30. remainingDocs = 35; the two
fully-matching segments [30,39] and [40,49]
+ // provide only 20 matching docs, so the boundary never advances enough to
prune them; both are kept.
+ List<IndexSegment> result =
+ prune(segments, "SELECT * FROM testTable WHERE testColumn > 25 ORDER
BY testColumn DESC LIMIT 5, 30");
+ assertEquals(result.size(), segments.size());
+ }
+
+ @Test
+ public void testFloatNaNNotTreatedAsFullMatch() {
+ // Regression: an all-NaN DOUBLE segment must not be counted as fully
matching 'col > 5'. NaN sorts as the largest
+ // value, so without the NaN guard the all-NaN segment would (a) be
counted as 10 matching docs and (b) sort first
+ // (DESC), advancing the boundary and wrongly pruning the segment [10, 20]
that actually holds the top-n.
+ Schema doubleSchema =
+ new Schema.SchemaBuilder().addSingleValueDimension(ORDER_BY_COLUMN,
FieldSpec.DataType.DOUBLE).build();
+ IndexSegment nanSegment = getDoubleSegment(Double.NaN, Double.NaN, 10);
+ IndexSegment realSegment = getDoubleSegment(10.0, 20.0, 10);
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT * FROM testTable WHERE testColumn > 5 ORDER BY testColumn DESC
LIMIT 5");
+ queryContext.setSchema(doubleSchema);
+ List<IndexSegment> result = _segmentPruner.prune(Arrays.asList(nanSegment,
realSegment), queryContext);
+ assertTrue(result.contains(realSegment), "segment [10, 20] holding the
top-n must not be pruned");
+ }
+
+ private IndexSegment getDoubleSegment(Double minValue, Double maxValue, int
totalDocs) {
+ IndexSegment indexSegment = mock(IndexSegment.class);
+ DataSource dataSource = mock(DataSource.class);
+ when(indexSegment.getDataSource(eq(ORDER_BY_COLUMN),
any(Schema.class))).thenReturn(dataSource);
+ DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
+ when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata);
+ when(dataSourceMetadata.getMinValue()).thenReturn(minValue);
+ when(dataSourceMetadata.getMaxValue()).thenReturn(maxValue);
+
when(dataSourceMetadata.getDataType()).thenReturn(FieldSpec.DataType.DOUBLE);
+ SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+ when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs);
+ return indexSegment;
+ }
+
+ @Test
+ public void testIsApplicableTo() {
+ // No filter: applicable (existing behavior), with or without order by.
+ assertTrue(_segmentPruner.isApplicableTo(queryWithSchema("SELECT * FROM
testTable LIMIT 5")));
+ assertTrue(_segmentPruner.isApplicableTo(queryWithSchema("SELECT * FROM
testTable ORDER BY testColumn LIMIT 5")));
+ // LIMIT 0 with a filter: applicable (just keeps one segment for the
schema).
+ assertTrue(_segmentPruner.isApplicableTo(
+ queryWithSchema("SELECT * FROM testTable WHERE testColumn > 5 ORDER BY
testColumn LIMIT 0")));
+ // Filtered order-by on a non-nullable identifier: applicable.
+ assertTrue(_segmentPruner.isApplicableTo(
+ queryWithSchema("SELECT * FROM testTable WHERE testColumn > 5 ORDER BY
testColumn DESC LIMIT 5")));
+ // Filtered selection-only (no order by): not applicable (count-based
pruning unsafe with a filter).
+ assertFalse(_segmentPruner.isApplicableTo(queryWithSchema("SELECT * FROM
testTable WHERE testColumn > 5 LIMIT 5")));
+ // Filtered order-by on a non-identifier: not applicable.
+ assertFalse(_segmentPruner.isApplicableTo(
+ queryWithSchema("SELECT * FROM testTable WHERE testColumn > 5 ORDER BY
testColumn + 1 DESC LIMIT 5")));
+ // Filtered order-by but null handling enabled -> column treated as
nullable -> not applicable.
+ assertFalse(_segmentPruner.isApplicableTo(queryWithSchema(
+ "SET enableNullHandling=true; SELECT * FROM testTable WHERE testColumn
> 5 ORDER BY testColumn DESC LIMIT 5")));
+ // Null handling on but no schema available -> conservatively treated as
null-handling-active -> not applicable.
+
assertFalse(_segmentPruner.isApplicableTo(QueryContextConverterUtils.getQueryContext(
+ "SET enableNullHandling=true; SELECT * FROM testTable WHERE testColumn
> 5 ORDER BY testColumn DESC LIMIT 5")));
+ }
+
+ @Test
+ public void testIsApplicableToColumnBasedNullHandling() {
+ String query = "SET enableNullHandling=true; "
+ + "SELECT * FROM testTable WHERE testColumn > 5 ORDER BY testColumn
DESC LIMIT 5";
+
+ // Column-based null handling + non-nullable column: nulls cannot occur,
so the optimization still applies even
+ // though null handling is enabled.
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(query);
+ queryContext.setSchema(columnBasedNullHandlingSchema(false));
+ assertTrue(_segmentPruner.isApplicableTo(queryContext));
+
+ // Column-based null handling + nullable column: skip (min/max may be
polluted by nulls).
+ queryContext = QueryContextConverterUtils.getQueryContext(query);
+ queryContext.setSchema(columnBasedNullHandlingSchema(true));
+ assertFalse(_segmentPruner.isApplicableTo(queryContext));
+ }
+
+ private static Schema columnBasedNullHandlingSchema(boolean
orderByColumnNullable) {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(ORDER_BY_COLUMN, FieldSpec.DataType.LONG)
+ .setEnableColumnBasedNullHandling(true)
+ .build();
+ schema.getFieldSpecFor(ORDER_BY_COLUMN).setNullable(orderByColumnNullable);
+ return schema;
+ }
+
+ private QueryContext queryWithSchema(String query) {
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(query);
+ queryContext.setSchema(SCHEMA);
+ return queryContext;
+ }
+
private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long
maxValue, int totalDocs) {
return getIndexSegment(minValue, maxValue, totalDocs, false);
}
private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long
maxValue, int totalDocs,
boolean upsert) {
+ // A STRING filter column whose [min, max] = ["a", "z"] never collapses to
a single value, so a predicate like
+ // 'foo = x' is never provably full-match -> exercises the "AND with a
non-provable conjunct" path.
+ return getIndexSegment(minValue, maxValue, totalDocs, upsert, "a", "z");
+ }
+
+ private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long
maxValue, int totalDocs,
+ boolean upsert, @Nullable String filterMinValue, @Nullable String
filterMaxValue) {
IndexSegment indexSegment = mock(IndexSegment.class);
when(indexSegment.getColumnNames()).thenReturn(ImmutableSet.of("foo",
"testColumn"));
DataSource dataSource = mock(DataSource.class);
@@ -225,6 +503,14 @@ public class SelectionQuerySegmentPrunerTest {
when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata);
when(dataSourceMetadata.getMinValue()).thenReturn(minValue);
when(dataSourceMetadata.getMaxValue()).thenReturn(maxValue);
+ when(dataSourceMetadata.getDataType()).thenReturn(FieldSpec.DataType.LONG);
+ DataSource filterDataSource = mock(DataSource.class);
+ when(indexSegment.getDataSource(eq(FILTER_COLUMN),
any(Schema.class))).thenReturn(filterDataSource);
+ DataSourceMetadata filterMetadata = mock(DataSourceMetadata.class);
+ when(filterDataSource.getDataSourceMetadata()).thenReturn(filterMetadata);
+ when(filterMetadata.getMinValue()).thenReturn(filterMinValue);
+ when(filterMetadata.getMaxValue()).thenReturn(filterMaxValue);
+ when(filterMetadata.getDataType()).thenReturn(FieldSpec.DataType.STRING);
SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs);
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkSelectionOrderByFilterPruning.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkSelectionOrderByFilterPruning.java
new file mode 100644
index 00000000000..ca35f055433
--- /dev/null
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkSelectionOrderByFilterPruning.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.query.config.SegmentPrunerConfig;
+import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.queries.BaseQueriesTest;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+/**
+ * End-to-end benchmark for the min/max segment pruning of selection {@code
ORDER BY <col> LIMIT n} queries with a
+ * filter (issue apache/pinot#18685).
+ * <p>The table is range-partitioned on a sorted, nullable {@code TS_COL}:
segment {@code i} covers the contiguous range
+ * {@code [i * numRows, (i + 1) * numRows)}. Each invocation runs the segment
pruners (the default chain, which respects
+ * {@link org.apache.pinot.core.query.pruner.SegmentPruner#isApplicableTo})
and then executes the surviving segments via
+ * the single-stage engine.
+ * <p>The comparison is self-contained (no need to diff against a base
commit): the optimization is gated off when null
+ * handling is active, so running the same filtered query <b>with</b> {@code
enableNullHandling=true} reproduces the
+ * pre-fix behavior (the filtered order-by is not limit-pruned and every
matching segment is engaged), while running it
+ * <b>without</b> null handling exercises the fix (only the top segment(s)
survive). The {@code NO_FILTER} query is a
+ * reference point that is limit-pruned regardless.
+ */
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Benchmark)
+public class BenchmarkSelectionOrderByFilterPruning extends BaseQueriesTest {
+ private static final File INDEX_DIR =
+ new File(FileUtils.getTempDirectory(),
"BenchmarkSelectionOrderByFilterPruning");
+ private static final String TABLE_NAME = "MyTable";
+ private static final String SEGMENT_NAME_TEMPLATE = "testSegment%d";
+ private static final String TS_COL = "TS_COL";
+ private static final String VAL_COL = "VAL_COL";
+
+ private static final TableConfig TABLE_CONFIG = new
TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(TABLE_NAME)
+ .setSortedColumn(TS_COL)
+ .build();
+ private static final Schema SCHEMA = new Schema.SchemaBuilder()
+ .setSchemaName(TABLE_NAME)
+ .addSingleValueDimension(TS_COL, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(VAL_COL, FieldSpec.DataType.INT)
+ .build();
+
+ /** Which query shape to run; see {@link #buildQuery}. */
+ public enum Mode {
+ /** Filtered order-by, optimization eligible (limit-pruned to the segments
the LIMIT needs). */
+ FILTER,
+ /** Same filtered query but with null handling on, which gates the
optimization off (pre-fix behavior). */
+ FILTER_NULL_HANDLING,
+ /** No filter: limit-pruned regardless of the fix (reference point). */
+ NO_FILTER
+ }
+
+ @Param({"50", "100", "200"})
+ private int _numSegments;
+ // Small segments so a larger LIMIT spans several of them.
+ @Param({"20000"})
+ private int _numRows;
+ @Param({"10", "200000"})
+ private int _limit;
+ @Param({"FILTER", "FILTER_NULL_HANDLING", "NO_FILTER"})
+ private Mode _mode;
+
+ private String _query;
+ private List<IndexSegment> _allSegments;
+ private List<IndexSegment> _selectedSegments;
+ private SegmentPrunerService _segmentPrunerService;
+
+ private static String buildQuery(Mode mode, int limit) {
+ String query = "SELECT * FROM MyTable "
+ + (mode == Mode.NO_FILTER ? "" : "WHERE TS_COL > 0 ")
+ + "ORDER BY TS_COL DESC LIMIT " + limit;
+ return mode == Mode.FILTER_NULL_HANDLING ? "SET enableNullHandling=true; "
+ query : query;
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ ChainedOptionsBuilder opt =
+ new
OptionsBuilder().include(BenchmarkSelectionOrderByFilterPruning.class.getSimpleName());
+ new Runner(opt.build()).run();
+ }
+
+ @Setup
+ public void setUp()
+ throws Exception {
+ _query = buildQuery(_mode, _limit);
+ FileUtils.deleteQuietly(INDEX_DIR);
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
+ _allSegments = new ArrayList<>(_numSegments);
+ for (int i = 0; i < _numSegments; i++) {
+ String name = String.format(SEGMENT_NAME_TEMPLATE, i);
+ buildSegment(name, i);
+ _allSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, name),
indexLoadingConfig));
+ }
+ _selectedSegments = _allSegments;
+ _segmentPrunerService = new SegmentPrunerService(new
SegmentPrunerConfig(new PinotConfiguration()));
+ }
+
+ @TearDown
+ public void tearDown() {
+ for (IndexSegment indexSegment : _allSegments) {
+ indexSegment.destroy();
+ }
+ FileUtils.deleteQuietly(INDEX_DIR);
+ EXECUTOR_SERVICE.shutdownNow();
+ }
+
+ private void buildSegment(String segmentName, int segmentIndex)
+ throws Exception {
+ long baseValue = (long) segmentIndex * _numRows;
+ List<GenericRow> rows = new ArrayList<>(_numRows);
+ for (int i = 0; i < _numRows; i++) {
+ GenericRow row = new GenericRow();
+ // Contiguous, non-overlapping, sorted range per segment.
+ row.putValue(TS_COL, baseValue + i);
+ row.putValue(VAL_COL, ThreadLocalRandom.current().nextInt());
+ rows.add(row);
+ }
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG,
SCHEMA);
+ config.setOutDir(INDEX_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(segmentName);
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+ driver.init(config, recordReader);
+ driver.build();
+ }
+ }
+
+ @Benchmark
+ public BrokerResponseNative query() {
+ // Prune first (this is what the fix affects), then execute the surviving
segments through the single-stage engine.
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(_query);
+ queryContext.setSchema(SCHEMA);
+ _selectedSegments = _segmentPrunerService.prune(_allSegments,
queryContext);
+ return getBrokerResponse(_query);
+ }
+
+ @Override
+ protected String getFilter() {
+ return "";
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _allSegments.get(0);
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _selectedSegments;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]