This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 45cf67d09e1 fixes for filtered projections that produce empty
projection tables (#18508)
45cf67d09e1 is described below
commit 45cf67d09e10aaf493556d2fe512f2034eead897
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Sep 9 16:37:37 2025 -0700
fixes for filtered projections that produce empty projection tables (#18508)
---
.../QueryableIndexTimeBoundaryInspector.java | 10 +-
.../druid/segment/CursorFactoryProjectionTest.java | 216 ++++++++++++++++++++-
2 files changed, 214 insertions(+), 12 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexTimeBoundaryInspector.java
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexTimeBoundaryInspector.java
index 28292fc2439..9e700b2ceb4 100644
---
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexTimeBoundaryInspector.java
+++
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexTimeBoundaryInspector.java
@@ -54,6 +54,9 @@ public class QueryableIndexTimeBoundaryInspector implements
TimeBoundaryInspecto
@MonotonicNonNull
private volatile DateTime maxTime;
+ @MonotonicNonNull
+ private volatile Integer numRows;
+
@Override
public DateTime getMinTime()
{
@@ -79,12 +82,15 @@ public class QueryableIndexTimeBoundaryInspector implements
TimeBoundaryInspecto
@Override
public boolean isMinMaxExact()
{
- return timeOrdered;
+ if (numRows == null) {
+ numRows = index.getNumRows();
+ }
+ return timeOrdered && numRows > 0;
}
private void populateMinMaxTime()
{
- if (timeOrdered) {
+ if (isMinMaxExact()) {
// Compute and cache minTime, maxTime.
final ColumnHolder columnHolder =
index.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME);
try (NumericColumn column = (NumericColumn) columnHolder.getColumn()) {
diff --git
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index ad49dc11eb4..21b694c56b8 100644
---
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -277,12 +277,48 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
.build(),
AggregateProjectionSpec.builder("a_concat_b_d_plus_f_sum_c")
.virtualColumns(
- new ExpressionVirtualColumn("__vc2", "d + e",
ColumnType.LONG, TestExprMacroTable.INSTANCE),
- new ExpressionVirtualColumn("__vc3",
"concat(a, b)", ColumnType.STRING, TestExprMacroTable.INSTANCE)
+ new ExpressionVirtualColumn(
+ "__vc2",
+ "d + e",
+ ColumnType.LONG,
+ TestExprMacroTable.INSTANCE
+ ),
+ new ExpressionVirtualColumn(
+ "__vc3",
+ "concat(a, b)",
+ ColumnType.STRING,
+ TestExprMacroTable.INSTANCE
+ )
)
.groupingColumns(new
LongDimensionSchema("__vc2"), new StringDimensionSchema("__vc3"))
.aggregators(new
LongSumAggregatorFactory("sum_c", "c"))
.build(),
+ AggregateProjectionSpec.builder("a_hourly_c_sum_filter_a_to_a")
+ .filter(
+ new EqualityFilter("a", ColumnType.STRING,
"a", null)
+ )
+
.virtualColumns(Granularities.toVirtualColumn(Granularities.HOUR, "__gran"))
+ .groupingColumns(
+ new StringDimensionSchema("a"),
+ new LongDimensionSchema("__gran")
+ )
+ .aggregators(
+ new LongSumAggregatorFactory("_c_sum", "c")
+ )
+ .build(),
+ AggregateProjectionSpec.builder("a_hourly_c_sum_filter_a_to_empty")
+ .filter(
+ new EqualityFilter("a", ColumnType.STRING,
"nomatch", null)
+ )
+
.virtualColumns(Granularities.toVirtualColumn(Granularities.HOUR, "__gran"))
+ .groupingColumns(
+ new StringDimensionSchema("a"),
+ new LongDimensionSchema("__gran")
+ )
+ .aggregators(
+ new LongSumAggregatorFactory("_c_sum", "c")
+ )
+ .build(),
AggregateProjectionSpec.builder("time_and_a")
.groupingColumns(
new
LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME),
@@ -298,10 +334,10 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
new LongDimensionSchema("__gran"),
new StringDimensionSchema("a")
)
- .aggregators(
- new CountAggregatorFactory("chocula"),
- new LongSumAggregatorFactory("sum_c", "sum_c")
- )
+ .aggregators(
+ new CountAggregatorFactory("chocula"),
+ new LongSumAggregatorFactory("sum_c", "sum_c")
+ )
.build(),
AggregateProjectionSpec.builder("afoo")
.virtualColumns(
@@ -340,7 +376,10 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
.groupingColumns(
projection.getGroupingColumns()
.stream()
- .map(x ->
new AutoTypeColumnSchema(x.getName(), null))
+ .map(x ->
new AutoTypeColumnSchema(
+
x.getName(),
+ null
+ ))
.collect(Collectors.toList())
)
.build()
@@ -1275,6 +1314,7 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
@Test
public void testTimeseriesQueryGranularityFinerThanProjectionGranularity()
{
+ // timeseries query only works on base table if base table is sorted by
time
Assume.assumeTrue(segmentSortedByTime);
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
@@ -1484,6 +1524,159 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
);
}
+ @Test
+ public void testProjectionFilteredProjectionMatch()
+ {
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .setDimFilter(new EqualityFilter("a", ColumnType.STRING,
"a", null))
+ .addDimension("a")
+ .build();
+
+ final boolean isRealtime = projectionsCursorFactory instanceof
IncrementalIndexCursorFactory;
+ // realtime projections don't have row count, so abfoo is chosen because
of how projection sorting happens
+ final ExpectedProjectionGroupBy queryMetrics = new
ExpectedProjectionGroupBy(isRealtime ? "abfoo" :
"a_hourly_c_sum_filter_a_to_a");
+
+ final CursorBuildSpec buildSpec =
GroupingEngine.makeCursorBuildSpec(query, queryMetrics);
+
+ assertCursorProjection(buildSpec, queryMetrics, isRealtime ? 4 : 2);
+
+ testGroupBy(
+ query,
+ queryMetrics,
+ makeArrayResultSet(
+ new Object[]{"a"}
+ )
+ );
+ }
+
+ @Test
+ public void testProjectionFilteredNoFilteredProjectionMatch()
+ {
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .setDimFilter(new EqualityFilter("a", ColumnType.STRING,
"b", null))
+ .addDimension("a")
+ .build();
+
+ final boolean isRealtime = projectionsCursorFactory instanceof
IncrementalIndexCursorFactory;
+ final ExpectedProjectionGroupBy queryMetrics = new
ExpectedProjectionGroupBy(isRealtime ? "abfoo" :
"a_hourly_c_sum_with_count_latest");
+
+ final CursorBuildSpec buildSpec =
GroupingEngine.makeCursorBuildSpec(query, queryMetrics);
+
+ assertCursorProjection(buildSpec, queryMetrics, isRealtime ? 2 : 1);
+
+ testGroupBy(
+ query,
+ queryMetrics,
+ makeArrayResultSet(
+ new Object[]{"b"}
+ )
+ );
+ }
+
+ @Test
+ public void testProjectionFilteredToEmpty()
+ {
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .setDimFilter(new EqualityFilter("a", ColumnType.STRING,
"nomatch", null))
+ .setContext(Map.of("useProjection",
"a_hourly_c_sum_filter_a_to_empty"))
+ .addDimension("a")
+ .build();
+
+ final ExpectedProjectionGroupBy queryMetrics = new
ExpectedProjectionGroupBy("a_hourly_c_sum_filter_a_to_empty");
+
+ final CursorBuildSpec buildSpec =
GroupingEngine.makeCursorBuildSpec(query, queryMetrics);
+
+ assertCursorProjection(buildSpec, queryMetrics, 0);
+
+ testGroupBy(
+ query,
+ queryMetrics,
+ makeArrayResultSet()
+ );
+ }
+
+ @Test
+ public void testProjectionFilteredToEmptyTimeseries()
+ {
+ final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource("test")
+
.intervals(ImmutableList.of(Intervals.ETERNITY))
+ .granularity(Granularities.ALL)
+ .filters(new EqualityFilter("a",
ColumnType.STRING, "nomatch", null))
+ .aggregators(new
LongSumAggregatorFactory("c_sum", "c"))
+
.context(ImmutableMap.of(QueryContexts.USE_PROJECTION,
"a_hourly_c_sum_filter_a_to_empty"))
+ .build();
+
+ final ExpectedProjectionTimeseries queryMetrics =
+ new ExpectedProjectionTimeseries("a_hourly_c_sum_filter_a_to_empty");
+
+ final CursorBuildSpec buildSpec =
TimeseriesQueryEngine.makeCursorBuildSpec(query, queryMetrics);
+
+ assertCursorProjection(buildSpec, queryMetrics, 0);
+
+ // realltime results are inconsistent between projection and base table
since projection is totally empty, but base
+ // table is reduced with filter
+ final boolean isRealtime = projectionsCursorFactory instanceof
IncrementalIndexCursorFactory;
+ final List<Object[]> expectedResults = Collections.singletonList(new
Object[]{TIMESTAMP, null});
+ final List<Object[]> expectedRealtimeResults = List.of();
+
+ final Sequence<Result<TimeseriesResultValue>> resultRows =
timeseriesEngine.process(
+ query,
+ projectionsCursorFactory,
+ projectionsTimeBoundaryInspector,
+ queryMetrics
+ );
+
+ queryMetrics.assertProjection();
+
+ final List<Result<TimeseriesResultValue>> results = resultRows.toList();
+ assertTimeseriesResults(
+ query.getResultRowSignature(RowSignature.Finalization.YES),
+ isRealtime ? expectedRealtimeResults : expectedResults,
+ results
+ );
+
+
+ Assertions.assertEquals(TIMESTAMP,
projectionsTimeBoundaryInspector.getMinTime());
+ if (isRealtime || segmentSortedByTime) {
+ Assertions.assertEquals(TIMESTAMP.plusHours(1).plusMinutes(1),
projectionsTimeBoundaryInspector.getMaxTime());
+ } else {
+ // min and max time are inexact for non time ordered segments
+ Assertions.assertEquals(
+ TIMESTAMP.plusHours(1).plusMinutes(1).plusMillis(1),
+ projectionsTimeBoundaryInspector.getMaxTime()
+ );
+ }
+
+ // timeseries query only works on base table if base table is sorted by
time
+ Assume.assumeTrue(segmentSortedByTime);
+ final Sequence<Result<TimeseriesResultValue>> resultRowsNoProjection =
timeseriesEngine.process(
+ query.withOverriddenContext(Map.of(QueryContexts.NO_PROJECTIONS,
true)),
+ projectionsCursorFactory,
+ projectionsTimeBoundaryInspector,
+ queryMetrics
+ );
+
+ final List<Result<TimeseriesResultValue>> resultsNoProjection =
resultRowsNoProjection.toList();
+ assertTimeseriesResults(
+ query.getResultRowSignature(RowSignature.Finalization.YES),
+ expectedResults,
+ resultsNoProjection
+ );
+ }
+
@Test
public void testProjectionGroupOnTime()
{
@@ -1631,6 +1824,7 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
final List<Result<TimeseriesResultValue>> results = resultRows.toList();
assertTimeseriesResults(query.getResultRowSignature(RowSignature.Finalization.YES),
expectedResults, results);
+ // timeseries query only works on base table if base table is sorted by
time
Assume.assumeTrue(segmentSortedByTime);
final Sequence<Result<TimeseriesResultValue>> resultRowsNoProjection =
timeseriesEngine.process(
query.withOverriddenContext(Map.of(QueryContexts.NO_PROJECTIONS,
true)),
@@ -1700,9 +1894,11 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
queryMetrics.assertProjection();
final Cursor cursor = cursorHolder.asCursor();
int rowCount = 0;
- while (!cursor.isDone()) {
- rowCount++;
- cursor.advance();
+ if (cursor != null) {
+ while (!cursor.isDone()) {
+ rowCount++;
+ cursor.advance();
+ }
}
Assert.assertEquals(expectedRowCount, rowCount);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]