This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 547369d6a8c fix projection cursor build spec interval matching to
consider projection granularity (#18486)
547369d6a8c is described below
commit 547369d6a8cca06adc49b6bfc4f31c27316a7ec6
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Sep 8 21:37:07 2025 -0700
fix projection cursor build spec interval matching to consider projection
granularity (#18486)
---
.../apache/druid/segment/SimpleQueryableIndex.java | 1 +
.../incremental/OnheapIncrementalIndex.java | 1 +
.../druid/segment/projections/Projections.java | 33 ++-
.../druid/segment/CursorFactoryProjectionTest.java | 59 +++++-
.../druid/segment/projections/ProjectionsTest.java | 223 +++++++++++++++++++++
5 files changed, 314 insertions(+), 3 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
index 72ac3b7240e..fa81414dd5a 100644
---
a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
@@ -224,6 +224,7 @@ public abstract class SimpleQueryableIndex implements
QueryableIndex
return Projections.findMatchingProjection(
cursorBuildSpec,
projections,
+ dataInterval,
(projectionName, columnName) ->
projectionColumns.get(projectionName).containsKey(columnName) ||
getColumnCapabilities(columnName) == null,
this::getProjectionQueryableIndex
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index f74812b4b22..3f8768c64ce 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -395,6 +395,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
return Projections.findMatchingProjection(
buildSpec,
aggregateProjections,
+ getInterval(),
(specName, columnName) ->
projections.get(specName).getDimensionsMap().containsKey(columnName)
|| getColumnCapabilities(columnName) == null,
projections::get
diff --git
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
index 8d66f7539e5..7370cff6355 100644
---
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
+++
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
@@ -32,6 +32,8 @@ import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.HashMap;
@@ -48,6 +50,7 @@ public class Projections
public static <T> QueryableProjection<T> findMatchingProjection(
CursorBuildSpec cursorBuildSpec,
SortedSet<AggregateProjectionMetadata> projections,
+ Interval dataInterval,
PhysicalColumnChecker physicalChecker,
Function<String, T> getRowSelector
)
@@ -62,7 +65,7 @@ public class Projections
if (name != null && !name.equals(spec.getSchema().getName())) {
continue;
}
- final ProjectionMatch match =
matchAggregateProjection(spec.getSchema(), cursorBuildSpec, physicalChecker);
+ final ProjectionMatch match =
matchAggregateProjection(spec.getSchema(), cursorBuildSpec, dataInterval,
physicalChecker);
if (match != null) {
if (cursorBuildSpec.getQueryMetrics() != null) {
cursorBuildSpec.getQueryMetrics().projection(spec.getSchema().getName());
@@ -108,6 +111,7 @@ public class Projections
public static ProjectionMatch matchAggregateProjection(
AggregateProjectionMetadata.Schema projection,
CursorBuildSpec queryCursorBuildSpec,
+ Interval dataInterval,
PhysicalColumnChecker physicalColumnChecker
)
{
@@ -117,6 +121,10 @@ public class Projections
if
(CollectionUtils.isNullOrEmpty(queryCursorBuildSpec.getPhysicalColumns())) {
return null;
}
+
+ if (isUnalignedInterval(projection, queryCursorBuildSpec, dataInterval)) {
+ return null;
+ }
ProjectionMatchBuilder matchBuilder = new ProjectionMatchBuilder();
// match virtual columns first, which will populate the 'remapColumns' of
the match builder
@@ -439,11 +447,32 @@ public class Projections
return projectionSpec.getSchema().getName() + "/";
}
+ /**
+ * Check that the query {@link CursorBuildSpec} either contains the entire
data interval, or that the query interval
+ * is aligned with {@link
AggregateProjectionMetadata.Schema#getEffectiveGranularity()}
+ */
+ private static boolean isUnalignedInterval(
+ AggregateProjectionMetadata.Schema projection,
+ CursorBuildSpec queryCursorBuildSpec,
+ Interval dataInterval
+ )
+ {
+ final Interval queryInterval = queryCursorBuildSpec.getInterval();
+ if (!queryInterval.contains(dataInterval)) {
+ final Granularity granularity = projection.getEffectiveGranularity();
+ final DateTime start = queryInterval.getStart();
+ final DateTime end = queryInterval.getEnd();
+ // the interval filter must align with the projection granularity for a
match to be valid
+ return !start.equals(granularity.bucketStart(start)) ||
!end.equals(granularity.bucketStart(end));
+ }
+ return false;
+ }
+
/**
* Returns true if column is defined in {@link
AggregateProjectionSpec#getGroupingColumns()} OR if the column does not
* exist in the base table. Part of determining if a projection can be used
for a given {@link CursorBuildSpec},
*
- * @see #matchAggregateProjection(AggregateProjectionMetadata.Schema,
CursorBuildSpec, PhysicalColumnChecker)
+ * @see #matchAggregateProjection(AggregateProjectionMetadata.Schema,
CursorBuildSpec, Interval, PhysicalColumnChecker)
*/
@FunctionalInterface
public interface PhysicalColumnChecker
diff --git
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index 10664880ebe..ad49dc11eb4 100644
---
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -82,6 +82,7 @@ import
org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.virtual.NestedFieldVirtualColumn;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.DateTime;
+import org.joda.time.Interval;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
@@ -748,7 +749,7 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
GroupByQuery.builder()
.setDataSource("test")
.setGranularity(Granularities.ALL)
- .setInterval(Intervals.ETERNITY)
+ .setInterval(new Interval(TIMESTAMP,
TIMESTAMP.plusDays(1)))
.addDimension("a")
.setDimFilter(new EqualityFilter("a", ColumnType.STRING,
"a", null))
.addAggregator(new LongSumAggregatorFactory("c_sum", "c"))
@@ -769,6 +770,62 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
);
}
+ @Test
+ public void testProjectionSingleDimFilterWithPartialIntervalAligned()
+ {
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(new Interval(TIMESTAMP,
TIMESTAMP.plusHours(1)))
+ .addDimension("a")
+ .setDimFilter(new EqualityFilter("a", ColumnType.STRING,
"a", null))
+ .addAggregator(new LongSumAggregatorFactory("c_sum", "c"))
+ .addAggregator(new LongLastAggregatorFactory("c_last",
"c", null))
+ .build();
+ final ExpectedProjectionGroupBy queryMetrics =
+ new ExpectedProjectionGroupBy("a_hourly_c_sum_with_count_latest");
+ final CursorBuildSpec buildSpec =
GroupingEngine.makeCursorBuildSpec(query, queryMetrics);
+
+ assertCursorProjection(buildSpec, queryMetrics, 1);
+
+ testGroupBy(
+ query,
+ queryMetrics,
+ Collections.singletonList(
+ new Object[]{"a", 4L,
Pair.of(TIMESTAMP.plusMinutes(4).getMillis(), 2L)}
+ )
+ );
+ }
+
+ @Test
+ public void testProjectionSingleDimFilterWithPartialIntervalUnaligned()
+ {
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(new Interval(TIMESTAMP,
TIMESTAMP.plusHours(1).minusMinutes(1)))
+ .addDimension("a")
+ .setDimFilter(new EqualityFilter("a", ColumnType.STRING,
"a", null))
+ .addAggregator(new LongSumAggregatorFactory("c_sum", "c"))
+ .addAggregator(new LongLastAggregatorFactory("c_last",
"c", null))
+ .build();
+ final ExpectedProjectionGroupBy queryMetrics =
+ new ExpectedProjectionGroupBy(null);
+ final CursorBuildSpec buildSpec =
GroupingEngine.makeCursorBuildSpec(query, queryMetrics);
+
+ assertCursorProjection(buildSpec, queryMetrics, 3);
+
+ testGroupBy(
+ query,
+ queryMetrics,
+ Collections.singletonList(
+ new Object[]{"a", 4L,
Pair.of(TIMESTAMP.plusMinutes(4).getMillis(), 2L)}
+ )
+ );
+ }
+
@Test
public void testProjectionSingleDimCount()
{
diff --git
a/processing/src/test/java/org/apache/druid/segment/projections/ProjectionsTest.java
b/processing/src/test/java/org/apache/druid/segment/projections/ProjectionsTest.java
index b8922a9a3f1..7be42c487e9 100644
---
a/processing/src/test/java/org/apache/druid/segment/projections/ProjectionsTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/projections/ProjectionsTest.java
@@ -22,13 +22,19 @@ package org.apache.druid.segment.projections;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.EqualityFilter;
import org.apache.druid.query.filter.LikeDimFilter;
import org.apache.druid.segment.AggregateProjectionMetadata;
import org.apache.druid.segment.CursorBuildSpec;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -68,6 +74,7 @@ class ProjectionsTest
ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
spec.getSchema(),
cursorBuildSpec,
+ Intervals.ETERNITY,
new RowSignatureChecker(baseTable)
);
ProjectionMatch expected = new ProjectionMatch(
@@ -113,6 +120,7 @@ class ProjectionsTest
Projections.matchAggregateProjection(
spec.getSchema(),
cursorBuildSpecNoFilter,
+ Intervals.ETERNITY,
new RowSignatureChecker(baseTable)
)
);
@@ -136,6 +144,7 @@ class ProjectionsTest
ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
spec.getSchema(),
cursorBuildSpecWithFilter,
+ Intervals.ETERNITY,
new RowSignatureChecker(baseTable)
);
ProjectionMatch expected = new ProjectionMatch(
@@ -182,6 +191,7 @@ class ProjectionsTest
Projections.matchAggregateProjection(
spec.getSchema(),
cursorBuildSpecNoFilter,
+ Intervals.ETERNITY,
new RowSignatureChecker(baseTable)
)
);
@@ -206,6 +216,7 @@ class ProjectionsTest
ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
spec.getSchema(),
cursorBuildSpecWithFilter,
+ Intervals.ETERNITY,
new RowSignatureChecker(baseTable)
);
ProjectionMatch expected = new ProjectionMatch(
@@ -220,6 +231,218 @@ class ProjectionsTest
Assertions.assertEquals(expected, projectionMatch);
}
+ @Test
+ public void testSchemaMatchIntervalEternity()
+ {
+ final DateTime time = Granularities.DAY.bucketStart(DateTimes.nowUtc());
+ RowSignature baseTable = RowSignature.builder()
+ .addTimeColumn()
+ .add("a", ColumnType.STRING)
+ .build();
+
+ AggregateProjectionMetadata spec = new AggregateProjectionMetadata(
+ AggregateProjectionSpec.builder("some_projection")
+ .groupingColumns(new StringDimensionSchema("a"))
+ .build()
+ .toMetadataSchema(),
+ 12345
+ );
+
+ CursorBuildSpec cursorBuildSpec = CursorBuildSpec.builder()
+
.setPhysicalColumns(Set.of("a"))
+
.setGroupingColumns(List.of("a"))
+ .build();
+
+ ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
+ spec.getSchema(),
+ cursorBuildSpec,
+ Intervals.ETERNITY,
+ new RowSignatureChecker(baseTable)
+ );
+ ProjectionMatch expected = new ProjectionMatch(
+ CursorBuildSpec.builder()
+ .setPhysicalColumns(Set.of("a"))
+ .setGroupingColumns(List.of("a"))
+ .setAggregators(List.of())
+ .build(),
+ Map.of()
+ );
+ Assertions.assertEquals(expected, projectionMatch);
+
+ // projection with no time column can still match cursor build spec with
eternity interval
+ projectionMatch = Projections.matchAggregateProjection(
+ spec.getSchema(),
+ cursorBuildSpec,
+ new Interval(time, time.plusHours(1)),
+ new RowSignatureChecker(baseTable)
+ );
+
+ Assertions.assertEquals(expected, projectionMatch);
+ }
+
+ @Test
+ public void testSchemaMatchIntervalProjectionGranularityEternity()
+ {
+ final DateTime time = Granularities.DAY.bucketStart(DateTimes.nowUtc());
+
+ RowSignature baseTable = RowSignature.builder()
+ .addTimeColumn()
+ .add("a", ColumnType.STRING)
+ .build();
+
+ // hour granularity projection
+ AggregateProjectionMetadata spec = new AggregateProjectionMetadata(
+ AggregateProjectionSpec.builder("some_projection")
+ .groupingColumns(
+ new
LongDimensionSchema(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME),
+ new StringDimensionSchema("a")
+ )
+ .virtualColumns(
+ Granularities.toVirtualColumn(
+ Granularities.HOUR,
+
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
+ )
+ )
+ .build()
+ .toMetadataSchema(),
+ 12345
+ );
+
+ // eternity interval cursor build spec with granularity set
+ CursorBuildSpec cursorBuildSpec = CursorBuildSpec.builder()
+
.setPhysicalColumns(Set.of("__time", "a"))
+
.setGroupingColumns(List.of("v0", "a"))
+ .setVirtualColumns(
+ VirtualColumns.create(
+
Granularities.toVirtualColumn(Granularities.HOUR, "v0")
+ )
+ )
+ .build();
+
+
+ ProjectionMatch expectedWithGranularity = new ProjectionMatch(
+ CursorBuildSpec.builder()
+ .setPhysicalColumns(Set.of("__time", "a"))
+ .setGroupingColumns(List.of("v0", "a"))
+ .setAggregators(List.of())
+ .build(),
+ Map.of("v0", "__time")
+ );
+
+ ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
+ spec.getSchema(),
+ cursorBuildSpec,
+ Intervals.ETERNITY,
+ new RowSignatureChecker(baseTable)
+ );
+ Assertions.assertEquals(expectedWithGranularity, projectionMatch);
+
+ projectionMatch = Projections.matchAggregateProjection(
+ spec.getSchema(),
+ cursorBuildSpec,
+ new Interval(time, time.plusHours(1)),
+ new RowSignatureChecker(baseTable)
+ );
+
+ Assertions.assertEquals(expectedWithGranularity, projectionMatch);
+
+ }
+
+ @Test
+ public void testSchemaMatchIntervalProjectionGranularity()
+ {
+ final DateTime time = Granularities.DAY.bucketStart(DateTimes.nowUtc());
+
+ RowSignature baseTable = RowSignature.builder()
+ .addTimeColumn()
+ .add("a", ColumnType.STRING)
+ .build();
+
+ // hour granularity projection
+ AggregateProjectionMetadata spec = new AggregateProjectionMetadata(
+ AggregateProjectionSpec.builder("some_projection")
+ .groupingColumns(
+ new
LongDimensionSchema(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME),
+ new StringDimensionSchema("a")
+ )
+ .virtualColumns(
+ Granularities.toVirtualColumn(
+ Granularities.HOUR,
+
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
+ )
+ )
+ .build()
+ .toMetadataSchema(),
+ 12345
+ );
+
+ Interval day = new Interval(time, time.plusDays(1));
+ Interval hour = new Interval(time, time.plusHours(1));
+ Interval partial = new Interval(time, time.plusMinutes(42));
+ // aligned interval cursor build spec
+ CursorBuildSpec cursorBuildSpecHourInterval = CursorBuildSpec.builder()
+
.setInterval(hour)
+
.setPhysicalColumns(Set.of("a"))
+
.setGroupingColumns(List.of("a"))
+ .build();
+
+ ProjectionMatch expected = new ProjectionMatch(
+ CursorBuildSpec.builder()
+ .setInterval(hour)
+ .setPhysicalColumns(Set.of("a"))
+ .setGroupingColumns(List.of("a"))
+ .setAggregators(List.of())
+ .build(),
+ Map.of()
+ );
+ ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
+ spec.getSchema(),
+ cursorBuildSpecHourInterval,
+ day,
+ new RowSignatureChecker(baseTable)
+ );
+ Assertions.assertEquals(expected, projectionMatch);
+
+
+ // partial interval does not align with projection granularity (and does
not contain data interval)
+ CursorBuildSpec cursorBuildSpecPartialInterval = CursorBuildSpec.builder()
+
.setInterval(partial)
+
.setPhysicalColumns(Set.of("a"))
+
.setGroupingColumns(List.of("a"))
+ .build();
+
+ projectionMatch = Projections.matchAggregateProjection(
+ spec.getSchema(),
+ cursorBuildSpecPartialInterval,
+ day,
+ new RowSignatureChecker(baseTable)
+ );
+ Assertions.assertNull(projectionMatch);
+
+ Interval wonky = new Interval(time, time.plusHours(1).plusMinutes(12));
+ CursorBuildSpec cursorBuildSpecUnalignedButContaining =
CursorBuildSpec.builder()
+
.setInterval(wonky)
+
.setPhysicalColumns(Set.of("a"))
+
.setGroupingColumns(List.of("a"))
+
.build();
+ expected = new ProjectionMatch(
+ CursorBuildSpec.builder()
+ .setInterval(wonky)
+ .setPhysicalColumns(Set.of("a"))
+ .setGroupingColumns(List.of("a"))
+ .setAggregators(List.of())
+ .build(),
+ Map.of()
+ );
+ projectionMatch = Projections.matchAggregateProjection(
+ spec.getSchema(),
+ cursorBuildSpecUnalignedButContaining,
+ hour,
+ new RowSignatureChecker(baseTable)
+ );
+ Assertions.assertEquals(expected, projectionMatch);
+ }
+
private static class RowSignatureChecker implements
Projections.PhysicalColumnChecker
{
private final RowSignature rowSignature;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]