This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 547369d6a8c fix projection cursor build spec interval matching to 
consider projection granularity (#18486)
547369d6a8c is described below

commit 547369d6a8cca06adc49b6bfc4f31c27316a7ec6
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Sep 8 21:37:07 2025 -0700

    fix projection cursor build spec interval matching to consider projection 
granularity (#18486)
---
 .../apache/druid/segment/SimpleQueryableIndex.java |   1 +
 .../incremental/OnheapIncrementalIndex.java        |   1 +
 .../druid/segment/projections/Projections.java     |  33 ++-
 .../druid/segment/CursorFactoryProjectionTest.java |  59 +++++-
 .../druid/segment/projections/ProjectionsTest.java | 223 +++++++++++++++++++++
 5 files changed, 314 insertions(+), 3 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java 
b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
index 72ac3b7240e..fa81414dd5a 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
@@ -224,6 +224,7 @@ public abstract class SimpleQueryableIndex implements 
QueryableIndex
     return Projections.findMatchingProjection(
         cursorBuildSpec,
         projections,
+        dataInterval,
         (projectionName, columnName) ->
             projectionColumns.get(projectionName).containsKey(columnName) || 
getColumnCapabilities(columnName) == null,
         this::getProjectionQueryableIndex
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index f74812b4b22..3f8768c64ce 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -395,6 +395,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
     return Projections.findMatchingProjection(
         buildSpec,
         aggregateProjections,
+        getInterval(),
         (specName, columnName) -> 
projections.get(specName).getDimensionsMap().containsKey(columnName)
                                   || getColumnCapabilities(columnName) == null,
         projections::get
diff --git 
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
 
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
index 8d66f7539e5..7370cff6355 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
@@ -32,6 +32,8 @@ import org.apache.druid.segment.CursorHolder;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.HashMap;
@@ -48,6 +50,7 @@ public class Projections
   public static <T> QueryableProjection<T> findMatchingProjection(
       CursorBuildSpec cursorBuildSpec,
       SortedSet<AggregateProjectionMetadata> projections,
+      Interval dataInterval,
       PhysicalColumnChecker physicalChecker,
       Function<String, T> getRowSelector
   )
@@ -62,7 +65,7 @@ public class Projections
         if (name != null && !name.equals(spec.getSchema().getName())) {
           continue;
         }
-        final ProjectionMatch match = 
matchAggregateProjection(spec.getSchema(), cursorBuildSpec, physicalChecker);
+        final ProjectionMatch match = 
matchAggregateProjection(spec.getSchema(), cursorBuildSpec, dataInterval, 
physicalChecker);
         if (match != null) {
           if (cursorBuildSpec.getQueryMetrics() != null) {
             
cursorBuildSpec.getQueryMetrics().projection(spec.getSchema().getName());
@@ -108,6 +111,7 @@ public class Projections
   public static ProjectionMatch matchAggregateProjection(
       AggregateProjectionMetadata.Schema projection,
       CursorBuildSpec queryCursorBuildSpec,
+      Interval dataInterval,
       PhysicalColumnChecker physicalColumnChecker
   )
   {
@@ -117,6 +121,10 @@ public class Projections
     if 
(CollectionUtils.isNullOrEmpty(queryCursorBuildSpec.getPhysicalColumns())) {
       return null;
     }
+
+    if (isUnalignedInterval(projection, queryCursorBuildSpec, dataInterval)) {
+      return null;
+    }
     ProjectionMatchBuilder matchBuilder = new ProjectionMatchBuilder();
 
     // match virtual columns first, which will populate the 'remapColumns' of 
the match builder
@@ -439,11 +447,32 @@ public class Projections
     return projectionSpec.getSchema().getName() + "/";
   }
 
+  /**
+   * Check that the query {@link CursorBuildSpec} either contains the entire 
data interval, or that the query interval
+   * is aligned with {@link 
AggregateProjectionMetadata.Schema#getEffectiveGranularity()}
+   */
+  private static boolean isUnalignedInterval(
+      AggregateProjectionMetadata.Schema projection,
+      CursorBuildSpec queryCursorBuildSpec,
+      Interval dataInterval
+  )
+  {
+    final Interval queryInterval = queryCursorBuildSpec.getInterval();
+    if (!queryInterval.contains(dataInterval)) {
+      final Granularity granularity = projection.getEffectiveGranularity();
+      final DateTime start = queryInterval.getStart();
+      final DateTime end = queryInterval.getEnd();
+      // the interval filter must align with the projection granularity for a 
match to be valid
+      return !start.equals(granularity.bucketStart(start)) || 
!end.equals(granularity.bucketStart(end));
+    }
+    return false;
+  }
+
   /**
    * Returns true if column is defined in {@link 
AggregateProjectionSpec#getGroupingColumns()} OR if the column does not
    * exist in the base table. Part of determining if a projection can be used 
for a given {@link CursorBuildSpec},
    *
-   * @see #matchAggregateProjection(AggregateProjectionMetadata.Schema, 
CursorBuildSpec, PhysicalColumnChecker)
+   * @see #matchAggregateProjection(AggregateProjectionMetadata.Schema, 
CursorBuildSpec, Interval, PhysicalColumnChecker)
    */
   @FunctionalInterface
   public interface PhysicalColumnChecker
diff --git 
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
 
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index 10664880ebe..ad49dc11eb4 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -82,6 +82,7 @@ import 
org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.apache.druid.segment.virtual.NestedFieldVirtualColumn;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.DateTime;
+import org.joda.time.Interval;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -748,7 +749,7 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
         GroupByQuery.builder()
                     .setDataSource("test")
                     .setGranularity(Granularities.ALL)
-                    .setInterval(Intervals.ETERNITY)
+                    .setInterval(new Interval(TIMESTAMP, 
TIMESTAMP.plusDays(1)))
                     .addDimension("a")
                     .setDimFilter(new EqualityFilter("a", ColumnType.STRING, 
"a", null))
                     .addAggregator(new LongSumAggregatorFactory("c_sum", "c"))
@@ -769,6 +770,62 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
     );
   }
 
+  @Test
+  public void testProjectionSingleDimFilterWithPartialIntervalAligned()
+  {
+    final GroupByQuery query =
+        GroupByQuery.builder()
+                    .setDataSource("test")
+                    .setGranularity(Granularities.ALL)
+                    .setInterval(new Interval(TIMESTAMP, 
TIMESTAMP.plusHours(1)))
+                    .addDimension("a")
+                    .setDimFilter(new EqualityFilter("a", ColumnType.STRING, 
"a", null))
+                    .addAggregator(new LongSumAggregatorFactory("c_sum", "c"))
+                    .addAggregator(new LongLastAggregatorFactory("c_last", 
"c", null))
+                    .build();
+    final ExpectedProjectionGroupBy queryMetrics =
+        new ExpectedProjectionGroupBy("a_hourly_c_sum_with_count_latest");
+    final CursorBuildSpec buildSpec = 
GroupingEngine.makeCursorBuildSpec(query, queryMetrics);
+
+    assertCursorProjection(buildSpec, queryMetrics, 1);
+
+    testGroupBy(
+        query,
+        queryMetrics,
+        Collections.singletonList(
+            new Object[]{"a", 4L, 
Pair.of(TIMESTAMP.plusMinutes(4).getMillis(), 2L)}
+        )
+    );
+  }
+
+  @Test
+  public void testProjectionSingleDimFilterWithPartialIntervalUnaligned()
+  {
+    final GroupByQuery query =
+        GroupByQuery.builder()
+                    .setDataSource("test")
+                    .setGranularity(Granularities.ALL)
+                    .setInterval(new Interval(TIMESTAMP, 
TIMESTAMP.plusHours(1).minusMinutes(1)))
+                    .addDimension("a")
+                    .setDimFilter(new EqualityFilter("a", ColumnType.STRING, 
"a", null))
+                    .addAggregator(new LongSumAggregatorFactory("c_sum", "c"))
+                    .addAggregator(new LongLastAggregatorFactory("c_last", 
"c", null))
+                    .build();
+    final ExpectedProjectionGroupBy queryMetrics =
+        new ExpectedProjectionGroupBy(null);
+    final CursorBuildSpec buildSpec = 
GroupingEngine.makeCursorBuildSpec(query, queryMetrics);
+
+    assertCursorProjection(buildSpec, queryMetrics, 3);
+
+    testGroupBy(
+        query,
+        queryMetrics,
+        Collections.singletonList(
+            new Object[]{"a", 4L, 
Pair.of(TIMESTAMP.plusMinutes(4).getMillis(), 2L)}
+        )
+    );
+  }
+
   @Test
   public void testProjectionSingleDimCount()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/projections/ProjectionsTest.java
 
b/processing/src/test/java/org/apache/druid/segment/projections/ProjectionsTest.java
index b8922a9a3f1..7be42c487e9 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/projections/ProjectionsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/projections/ProjectionsTest.java
@@ -22,13 +22,19 @@ package org.apache.druid.segment.projections;
 import org.apache.druid.data.input.impl.AggregateProjectionSpec;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.filter.EqualityFilter;
 import org.apache.druid.query.filter.LikeDimFilter;
 import org.apache.druid.segment.AggregateProjectionMetadata;
 import org.apache.druid.segment.CursorBuildSpec;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -68,6 +74,7 @@ class ProjectionsTest
     ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
         spec.getSchema(),
         cursorBuildSpec,
+        Intervals.ETERNITY,
         new RowSignatureChecker(baseTable)
     );
     ProjectionMatch expected = new ProjectionMatch(
@@ -113,6 +120,7 @@ class ProjectionsTest
         Projections.matchAggregateProjection(
             spec.getSchema(),
             cursorBuildSpecNoFilter,
+            Intervals.ETERNITY,
             new RowSignatureChecker(baseTable)
         )
     );
@@ -136,6 +144,7 @@ class ProjectionsTest
     ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
         spec.getSchema(),
         cursorBuildSpecWithFilter,
+        Intervals.ETERNITY,
         new RowSignatureChecker(baseTable)
     );
     ProjectionMatch expected = new ProjectionMatch(
@@ -182,6 +191,7 @@ class ProjectionsTest
         Projections.matchAggregateProjection(
             spec.getSchema(),
             cursorBuildSpecNoFilter,
+            Intervals.ETERNITY,
             new RowSignatureChecker(baseTable)
         )
     );
@@ -206,6 +216,7 @@ class ProjectionsTest
     ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
         spec.getSchema(),
         cursorBuildSpecWithFilter,
+        Intervals.ETERNITY,
         new RowSignatureChecker(baseTable)
     );
     ProjectionMatch expected = new ProjectionMatch(
@@ -220,6 +231,218 @@ class ProjectionsTest
     Assertions.assertEquals(expected, projectionMatch);
   }
 
+  @Test
+  public void testSchemaMatchIntervalEternity()
+  {
+    final DateTime time = Granularities.DAY.bucketStart(DateTimes.nowUtc());
+    RowSignature baseTable = RowSignature.builder()
+                                         .addTimeColumn()
+                                         .add("a", ColumnType.STRING)
+                                         .build();
+
+    AggregateProjectionMetadata spec = new AggregateProjectionMetadata(
+        AggregateProjectionSpec.builder("some_projection")
+                               .groupingColumns(new StringDimensionSchema("a"))
+                               .build()
+                               .toMetadataSchema(),
+        12345
+    );
+
+    CursorBuildSpec cursorBuildSpec = CursorBuildSpec.builder()
+                                                     
.setPhysicalColumns(Set.of("a"))
+                                                     
.setGroupingColumns(List.of("a"))
+                                                     .build();
+
+    ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
+        spec.getSchema(),
+        cursorBuildSpec,
+        Intervals.ETERNITY,
+        new RowSignatureChecker(baseTable)
+    );
+    ProjectionMatch expected = new ProjectionMatch(
+        CursorBuildSpec.builder()
+                       .setPhysicalColumns(Set.of("a"))
+                       .setGroupingColumns(List.of("a"))
+                       .setAggregators(List.of())
+                       .build(),
+        Map.of()
+    );
+    Assertions.assertEquals(expected, projectionMatch);
+
+    // projection with no time column can still match cursor build spec with 
eternity interval
+    projectionMatch = Projections.matchAggregateProjection(
+        spec.getSchema(),
+        cursorBuildSpec,
+        new Interval(time, time.plusHours(1)),
+        new RowSignatureChecker(baseTable)
+    );
+
+    Assertions.assertEquals(expected, projectionMatch);
+  }
+
+  @Test
+  public void testSchemaMatchIntervalProjectionGranularityEternity()
+  {
+    final DateTime time = Granularities.DAY.bucketStart(DateTimes.nowUtc());
+
+    RowSignature baseTable = RowSignature.builder()
+                                         .addTimeColumn()
+                                         .add("a", ColumnType.STRING)
+                                         .build();
+
+    // hour granularity projection
+    AggregateProjectionMetadata spec = new AggregateProjectionMetadata(
+        AggregateProjectionSpec.builder("some_projection")
+                               .groupingColumns(
+                                   new 
LongDimensionSchema(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME),
+                                   new StringDimensionSchema("a")
+                               )
+                               .virtualColumns(
+                                   Granularities.toVirtualColumn(
+                                       Granularities.HOUR,
+                                       
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
+                                   )
+                               )
+                               .build()
+                               .toMetadataSchema(),
+        12345
+    );
+
+    // eternity interval cursor build spec with granularity set
+    CursorBuildSpec cursorBuildSpec = CursorBuildSpec.builder()
+                                                     
.setPhysicalColumns(Set.of("__time", "a"))
+                                                     
.setGroupingColumns(List.of("v0", "a"))
+                                                     .setVirtualColumns(
+                                                         VirtualColumns.create(
+                                                             
Granularities.toVirtualColumn(Granularities.HOUR, "v0")
+                                                         )
+                                                     )
+                                                     .build();
+
+
+    ProjectionMatch expectedWithGranularity = new ProjectionMatch(
+        CursorBuildSpec.builder()
+                       .setPhysicalColumns(Set.of("__time", "a"))
+                       .setGroupingColumns(List.of("v0", "a"))
+                       .setAggregators(List.of())
+                       .build(),
+        Map.of("v0", "__time")
+    );
+
+    ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
+        spec.getSchema(),
+        cursorBuildSpec,
+        Intervals.ETERNITY,
+        new RowSignatureChecker(baseTable)
+    );
+    Assertions.assertEquals(expectedWithGranularity, projectionMatch);
+
+    projectionMatch = Projections.matchAggregateProjection(
+        spec.getSchema(),
+        cursorBuildSpec,
+        new Interval(time, time.plusHours(1)),
+        new RowSignatureChecker(baseTable)
+    );
+
+    Assertions.assertEquals(expectedWithGranularity, projectionMatch);
+
+  }
+
+  @Test
+  public void testSchemaMatchIntervalProjectionGranularity()
+  {
+    final DateTime time = Granularities.DAY.bucketStart(DateTimes.nowUtc());
+
+    RowSignature baseTable = RowSignature.builder()
+                                         .addTimeColumn()
+                                         .add("a", ColumnType.STRING)
+                                         .build();
+
+    // hour granularity projection
+    AggregateProjectionMetadata spec = new AggregateProjectionMetadata(
+        AggregateProjectionSpec.builder("some_projection")
+                               .groupingColumns(
+                                   new 
LongDimensionSchema(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME),
+                                   new StringDimensionSchema("a")
+                               )
+                               .virtualColumns(
+                                   Granularities.toVirtualColumn(
+                                       Granularities.HOUR,
+                                       
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
+                                   )
+                               )
+                               .build()
+                               .toMetadataSchema(),
+        12345
+    );
+
+    Interval day = new Interval(time, time.plusDays(1));
+    Interval hour = new Interval(time, time.plusHours(1));
+    Interval partial = new Interval(time, time.plusMinutes(42));
+    // aligned interval cursor build spec
+    CursorBuildSpec cursorBuildSpecHourInterval = CursorBuildSpec.builder()
+                                                                 
.setInterval(hour)
+                                                                 
.setPhysicalColumns(Set.of("a"))
+                                                                 
.setGroupingColumns(List.of("a"))
+                                                                 .build();
+
+    ProjectionMatch expected = new ProjectionMatch(
+        CursorBuildSpec.builder()
+                       .setInterval(hour)
+                       .setPhysicalColumns(Set.of("a"))
+                       .setGroupingColumns(List.of("a"))
+                       .setAggregators(List.of())
+                       .build(),
+        Map.of()
+    );
+    ProjectionMatch projectionMatch = Projections.matchAggregateProjection(
+        spec.getSchema(),
+        cursorBuildSpecHourInterval,
+        day,
+        new RowSignatureChecker(baseTable)
+    );
+    Assertions.assertEquals(expected, projectionMatch);
+
+
+    // partial interval does not align with projection granularity (and does 
not contain data interval)
+    CursorBuildSpec cursorBuildSpecPartialInterval = CursorBuildSpec.builder()
+                                                                    
.setInterval(partial)
+                                                                    
.setPhysicalColumns(Set.of("a"))
+                                                                    
.setGroupingColumns(List.of("a"))
+                                                                    .build();
+
+    projectionMatch = Projections.matchAggregateProjection(
+        spec.getSchema(),
+        cursorBuildSpecPartialInterval,
+        day,
+        new RowSignatureChecker(baseTable)
+    );
+    Assertions.assertNull(projectionMatch);
+
+    Interval wonky = new Interval(time, time.plusHours(1).plusMinutes(12));
+    CursorBuildSpec cursorBuildSpecUnalignedButContaining = 
CursorBuildSpec.builder()
+                                                                           
.setInterval(wonky)
+                                                                           
.setPhysicalColumns(Set.of("a"))
+                                                                           
.setGroupingColumns(List.of("a"))
+                                                                           
.build();
+    expected = new ProjectionMatch(
+        CursorBuildSpec.builder()
+                       .setInterval(wonky)
+                       .setPhysicalColumns(Set.of("a"))
+                       .setGroupingColumns(List.of("a"))
+                       .setAggregators(List.of())
+                       .build(),
+        Map.of()
+    );
+    projectionMatch = Projections.matchAggregateProjection(
+        spec.getSchema(),
+        cursorBuildSpecUnalignedButContaining,
+        hour,
+        new RowSignatureChecker(baseTable)
+    );
+    Assertions.assertEquals(expected, projectionMatch);
+  }
+
   private static class RowSignatureChecker implements 
Projections.PhysicalColumnChecker
   {
     private final RowSignature rowSignature;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to