This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch 33.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/33.0.0 by this push:
     new f76077af7fc [Backport] fixes and validation for projections with 
rollup base tables (#17866) (#17869)
f76077af7fc is described below

commit f76077af7fc3b6fbf5557d349b56cfb67dd2ac79
Author: Karan Kumar <[email protected]>
AuthorDate: Fri Apr 4 10:14:51 2025 +0530

    [Backport] fixes and validation for projections with rollup base tables 
(#17866) (#17869)
    
    * fixes and validation for projections with rollup base tables
    
    changes:
    * Moved projection dimension initialization into 
`OnHeapAggregateProjection` instead of being done externally
    * Fix projection aggregations to work properly for rollup base tables with 
the same aggregations (swap out projection agg with base table agg to build the 
projection from the raw input row)
    * Added validation for projection aggregations to ensure they either 
reference a base table aggregation (being a combining agg of some base table 
agg) or they reference a base table dimension(s)
    * Added validation for projection virtual columns to ensure they reference 
a base table dimension
    * Tests for rollup projections
    * Tests for `OnHeapAggregationProjection` validations
    
    (cherry picked from commit 30b7bf0c9b17b1087c0ecbac85ecf5aeb3ae99dc)
    
    Co-authored-by: Clint Wylie <[email protected]>
---
 .../druid/segment/AggregateProjectionMetadata.java |  20 ++
 .../segment/incremental/IncrementalIndex.java      |   5 -
 .../incremental/OnHeapAggregateProjection.java     | 230 +++++++++++---
 .../incremental/OnheapIncrementalIndex.java        |  72 +----
 .../druid/segment/projections/Projections.java     |   6 +
 .../druid/segment/CursorFactoryProjectionTest.java | 347 ++++++++++++++++-----
 .../apache/druid/segment/IndexMergerTestBase.java  |   3 +-
 .../incremental/OnheapIncrementalIndexTest.java    | 246 ++++++++++++++-
 8 files changed, 749 insertions(+), 180 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
 
b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
index 0b54a80e8d4..7e9aec5e808 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
@@ -319,6 +319,10 @@ public class AggregateProjectionMetadata
           if (matchBuilder == null) {
             return null;
           }
+          // a query grouping column must also be defined as a projection 
grouping column
+          if (isInvalidGrouping(queryColumn) || 
isInvalidGrouping(matchBuilder.getRemapValue(queryColumn))) {
+            return null;
+          }
         }
       }
       if (queryCursorBuildSpec.getFilter() != null) {
@@ -444,6 +448,22 @@ public class AggregateProjectionMetadata
       }
     }
 
+    /**
+     * Check if a column is either part of {@link #groupingColumns}, or at 
least is not present in
+     * {@link #virtualColumns}. Naively we would just check that grouping 
column contains the column in question,
+     * however we can also use a projection when a column is truly missing. 
{@link #matchRequiredColumn} returns a
+     * match builder if the column is present as either a physical column, or 
a virtual column, but a virtual column
+     * could also be present for an aggregator input, so we must further check 
that a column not in the grouping list
+     * is also not a virtual column, the implication being that it is a 
missing column.
+     */
+    private boolean isInvalidGrouping(@Nullable String columnName)
+    {
+      if (columnName == null) {
+        return false;
+      }
+      return !groupingColumns.contains(columnName) && 
virtualColumns.exists(columnName);
+    }
+
     @Override
     public boolean equals(Object o)
     {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index a0729915454..62a9541b5e9 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -690,11 +690,6 @@ public abstract class IncrementalIndex implements 
IncrementalIndexRowSelector, C
     return numEntries;
   }
 
-  AggregatorFactory[] getMetrics()
-  {
-    return metrics;
-  }
-
   public AtomicLong getBytesInMemory()
   {
     return bytesInMemory;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
index 75650a1c182..ff5b8563f70 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
@@ -21,15 +21,20 @@ package org.apache.druid.segment.incremental;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.query.OrderBy;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.aggregation.AggregatorAndSize;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.AggregateProjectionMetadata;
+import org.apache.druid.segment.AutoTypeColumnIndexer;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.EncodedKeyComponent;
+import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.CapabilitiesBasedFormat;
 import org.apache.druid.segment.column.ColumnCapabilities;
@@ -40,6 +45,9 @@ import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.ValueType;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +55,7 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 
 /**
  * Projection of {@link OnheapIncrementalIndex} for {@link 
org.apache.druid.data.input.impl.AggregateProjectionSpec}
@@ -72,20 +81,31 @@ public class OnHeapAggregateProjection implements 
IncrementalIndexRowSelector
   private final AtomicInteger numEntries = new AtomicInteger(0);
 
   public OnHeapAggregateProjection(
-      AggregateProjectionMetadata.Schema schema,
-      List<IncrementalIndex.DimensionDesc> dimensions,
-      Map<String, IncrementalIndex.DimensionDesc> dimensionsMap,
-      int[] parentDimensionIndex,
+      AggregateProjectionSpec projectionSpec,
+      Function<String, IncrementalIndex.DimensionDesc> 
getBaseTableDimensionDesc,
+      Function<String, AggregatorFactory> getBaseTableAggregatorFactory,
       long minTimestamp,
       boolean useMaxMemoryEstimates,
       long maxBytesPerRowForAggregators
   )
   {
-    this.projectionSchema = schema;
-    this.dimensions = dimensions;
-    this.parentDimensionIndex = parentDimensionIndex;
-    this.dimensionsMap = dimensionsMap;
+    this.projectionSchema = projectionSpec.toMetadataSchema();
     this.minTimestamp = minTimestamp;
+    this.useMaxMemoryEstimates = useMaxMemoryEstimates;
+    this.maxBytesPerRowForAggregators = maxBytesPerRowForAggregators;
+
+    // initialize dimensions, facts holder
+    this.dimensions = new ArrayList<>();
+    // mapping of position in descs on the projection to position in the 
parent incremental index. Like the parent
+    // incremental index, the time (or time-like) column does not have a 
dimension descriptor and is specially
+    // handled as the timestamp of the row. Unlike the parent incremental 
index, an aggregating projection will
+    // always have its time-like column in the grouping columns list, so its 
position in this array specifies -1
+    this.parentDimensionIndex = new 
int[projectionSpec.getGroupingColumns().size()];
+    Arrays.fill(parentDimensionIndex, -1);
+    this.dimensionsMap = new HashMap<>();
+    this.columnFormats = new LinkedHashMap<>();
+
+    initializeAndValidateDimensions(projectionSpec, getBaseTableDimensionDesc, 
useMaxMemoryEstimates);
     final IncrementalIndex.IncrementalIndexRowComparator rowComparator = new 
IncrementalIndex.IncrementalIndexRowComparator(
         projectionSchema.getTimeColumnPosition() < 0 ? dimensions.size() : 
projectionSchema.getTimeColumnPosition(),
         dimensions
@@ -95,42 +115,17 @@ public class OnHeapAggregateProjection implements 
IncrementalIndexRowSelector
         dimensions,
         projectionSchema.getTimeColumnPosition() == 0
     );
-    this.useMaxMemoryEstimates = useMaxMemoryEstimates;
-    this.maxBytesPerRowForAggregators = maxBytesPerRowForAggregators;
 
+    // validate virtual columns refer to base table dimensions and initialize 
selector factory
+    validateVirtualColumns(projectionSpec, getBaseTableDimensionDesc);
     this.virtualSelectorFactory = new 
OnheapIncrementalIndex.CachingColumnSelectorFactory(
-        IncrementalIndex.makeColumnSelectorFactory(schema.getVirtualColumns(), 
inputRowHolder, null)
+        
IncrementalIndex.makeColumnSelectorFactory(projectionSchema.getVirtualColumns(),
 inputRowHolder, null)
     );
+    // initialize aggregators
     this.aggSelectors = new LinkedHashMap<>();
     this.aggregatorsMap = new LinkedHashMap<>();
-    this.aggregatorFactories = new 
AggregatorFactory[schema.getAggregators().length];
-    this.columnFormats = new LinkedHashMap<>();
-    for (IncrementalIndex.DimensionDesc dimension : dimensions) {
-      if (dimension.getName().equals(projectionSchema.getTimeColumnName())) {
-        columnFormats.put(
-            dimension.getName(),
-            new 
CapabilitiesBasedFormat(ColumnCapabilitiesImpl.createDefault().setType(ColumnType.LONG))
-        );
-      } else {
-        columnFormats.put(dimension.getName(), 
dimension.getIndexer().getFormat());
-      }
-    }
-    int i = 0;
-    for (AggregatorFactory agg : schema.getAggregators()) {
-      IncrementalIndex.MetricDesc metricDesc = new 
IncrementalIndex.MetricDesc(aggregatorsMap.size(), agg);
-      aggregatorsMap.put(metricDesc.getName(), metricDesc);
-      columnFormats.put(metricDesc.getName(), new 
CapabilitiesBasedFormat(metricDesc.getCapabilities()));
-      final ColumnSelectorFactory factory;
-      if (agg.getIntermediateType().is(ValueType.COMPLEX)) {
-        factory = new OnheapIncrementalIndex.CachingColumnSelectorFactory(
-            IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY, 
inputRowHolder, agg)
-        );
-      } else {
-        factory = virtualSelectorFactory;
-      }
-      aggSelectors.put(agg.getName(), factory);
-      aggregatorFactories[i++] = agg;
-    }
+    this.aggregatorFactories = new 
AggregatorFactory[projectionSchema.getAggregators().length];
+    initializeAndValidateAggregators(projectionSpec, 
getBaseTableDimensionDesc, getBaseTableAggregatorFactory);
   }
 
   /**
@@ -355,6 +350,163 @@ public class OnHeapAggregateProjection implements 
IncrementalIndexRowSelector
     return new AggregateProjectionMetadata(projectionSchema, numEntries.get());
   }
 
+  private void validateVirtualColumns(
+      AggregateProjectionSpec projectionSpec,
+      Function<String, IncrementalIndex.DimensionDesc> 
getBaseTableDimensionDesc
+  )
+  {
+    for (VirtualColumn vc : 
projectionSchema.getVirtualColumns().getVirtualColumns()) {
+      for (String column : vc.requiredColumns()) {
+        if (column.equals(projectionSchema.getTimeColumnName()) || 
column.equals(ColumnHolder.TIME_COLUMN_NAME)) {
+          continue;
+        }
+        if (getBaseTableDimensionDesc.apply(column) == null) {
+          throw InvalidInput.exception(
+              "projection[%s] contains virtual column[%s] that references an 
input[%s] which is not a dimension in the base table",
+              projectionSpec.getName(),
+              vc.getOutputName(),
+              column
+          );
+        }
+      }
+    }
+  }
+
+  private void initializeAndValidateDimensions(
+      AggregateProjectionSpec projectionSpec,
+      Function<String, IncrementalIndex.DimensionDesc> 
getBaseTableDimensionDesc,
+      boolean useMaxMemoryEstimates
+  )
+  {
+    int i = 0;
+    for (DimensionSchema dimension : projectionSpec.getGroupingColumns()) {
+      if (dimension.getName().equals(projectionSchema.getTimeColumnName())) {
+        columnFormats.put(
+            dimension.getName(),
+            new 
CapabilitiesBasedFormat(ColumnCapabilitiesImpl.createDefault().setType(ColumnType.LONG))
+        );
+        continue;
+      }
+      final IncrementalIndex.DimensionDesc parent = 
getBaseTableDimensionDesc.apply(dimension.getName());
+      if (parent == null) {
+        // this dimension only exists in the child, it needs its own handler
+        final IncrementalIndex.DimensionDesc childOnly = new 
IncrementalIndex.DimensionDesc(
+            i++,
+            dimension.getName(),
+            dimension.getDimensionHandler(),
+            useMaxMemoryEstimates
+        );
+
+        dimensions.add(childOnly);
+        dimensionsMap.put(dimension.getName(), childOnly);
+        columnFormats.put(dimension.getName(), 
childOnly.getIndexer().getFormat());
+      } else {
+        if 
(!dimension.getColumnType().equals(parent.getCapabilities().toColumnType())) {
+          // special handle auto column schema, who reports type as json in 
schema, but indexer reports whatever
+          // type it has seen, which is string at this stage
+          boolean allowAuto = 
ColumnType.NESTED_DATA.equals(dimension.getColumnType()) &&
+                              parent.getIndexer() instanceof 
AutoTypeColumnIndexer;
+          InvalidInput.conditionalException(
+              allowAuto,
+              "projection[%s] contains dimension[%s] with different type[%s] 
than type[%s] in base table",
+              projectionSpec.getName(),
+              dimension.getName(),
+              dimension.getColumnType(),
+              parent.getCapabilities().toColumnType()
+          );
+        }
+        // make a new DimensionDesc from the child, containing all of the 
parents stuff but with the childs position
+        final IncrementalIndex.DimensionDesc child = new 
IncrementalIndex.DimensionDesc(
+            i++,
+            parent.getName(),
+            parent.getHandler(),
+            parent.getIndexer()
+        );
+
+        dimensions.add(child);
+        dimensionsMap.put(dimension.getName(), child);
+        parentDimensionIndex[child.getIndex()] = parent.getIndex();
+        columnFormats.put(dimension.getName(), child.getIndexer().getFormat());
+      }
+    }
+  }
+
+  private void initializeAndValidateAggregators(
+      AggregateProjectionSpec projectionSpec,
+      Function<String, IncrementalIndex.DimensionDesc> 
getBaseTableDimensionDesc,
+      Function<String, AggregatorFactory> getBaseTableAggregatorFactory
+  )
+  {
+    int i = 0;
+    for (AggregatorFactory agg : projectionSchema.getAggregators()) {
+      AggregatorFactory aggToUse = agg;
+      AggregatorFactory baseTableAgg = 
getBaseTableAggregatorFactory.apply(agg.getName());
+      if (baseTableAgg != null) {
+        // if the aggregator references a base table aggregator, it must have 
the same name and be a combining aggregator
+        // of the base table agg
+        if (!agg.equals(baseTableAgg.getCombiningFactory())) {
+          throw InvalidInput.exception(
+              "projection[%s] contains aggregator[%s] that is not the 
'combining' aggregator of base table aggregator[%s]",
+              projectionSpec.getName(),
+              agg.getName(),
+              agg.getName()
+          );
+        }
+        aggToUse = baseTableAgg;
+      } else {
+        // otherwise, the aggregator must reference base table dimensions
+        for (String column : agg.requiredFields()) {
+          if (column.equals(projectionSchema.getTimeColumnName()) || 
column.equals(ColumnHolder.TIME_COLUMN_NAME)) {
+            continue;
+          }
+          if (getBaseTableAggregatorFactory.apply(column) != null) {
+            throw InvalidInput.exception(
+                "projection[%s] contains aggregator[%s] that references 
aggregator[%s] in base table but this is not supported, projection aggregators 
which reference base table aggregates must be 'combining' aggregators with the 
same name as the base table column",
+                projectionSpec.getName(),
+                agg.getName(),
+                column
+            );
+          }
+          if (getBaseTableDimensionDesc.apply(column) == null) {
+            // aggregators with virtual column inputs are not supported yet. 
Supporting this requires some additional
+            // work so that there is a way to do something like rename 
aggregators input column names so the projection
+            // agg which references the projection virtual column can be 
changed to the query virtual column name
+            // (since the query agg references the query virtual column). 
Disallow but provide a helpful error for now
+            if (projectionSchema.getVirtualColumns().exists(column)) {
+              throw InvalidInput.exception(
+                  "projection[%s] contains aggregator[%s] that is has required 
field[%s] which is a virtual column, this is not yet supported",
+                  projectionSpec.getName(),
+                  agg.getName(),
+                  column
+              );
+            }
+            // not a virtual column, doesn't refer to a base table dimension 
either, bail instead of ingesting a bunch
+            // of nulls
+            throw InvalidInput.exception(
+                "projection[%s] contains aggregator[%s] that is missing 
required field[%s] in base table",
+                projectionSpec.getName(),
+                agg.getName(),
+                column
+            );
+          }
+        }
+      }
+      IncrementalIndex.MetricDesc metricDesc = new 
IncrementalIndex.MetricDesc(aggregatorsMap.size(), aggToUse);
+      aggregatorsMap.put(metricDesc.getName(), metricDesc);
+      columnFormats.put(metricDesc.getName(), new 
CapabilitiesBasedFormat(metricDesc.getCapabilities()));
+      final ColumnSelectorFactory factory;
+      if (agg.getIntermediateType().is(ValueType.COMPLEX)) {
+        factory = new OnheapIncrementalIndex.CachingColumnSelectorFactory(
+            IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY, 
inputRowHolder, aggToUse)
+        );
+      } else {
+        factory = virtualSelectorFactory;
+      }
+      aggSelectors.put(aggToUse.getName(), factory);
+      aggregatorFactories[i++] = aggToUse;
+    }
+  }
+
   private long factorizeAggs(AggregatorFactory[] aggregatorFactories, 
Aggregator[] aggs)
   {
     long totalInitialSizeBytes = 0L;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index d6c67a2fcaa..fa622175184 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -28,9 +28,7 @@ import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
 import org.apache.druid.data.input.MapBasedRow;
 import org.apache.druid.data.input.Row;
 import org.apache.druid.data.input.impl.AggregateProjectionSpec;
-import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.error.DruidException;
-import org.apache.druid.error.InvalidInput;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
@@ -42,7 +40,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.PostAggregator;
 import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.segment.AggregateProjectionMetadata;
-import org.apache.druid.segment.AutoTypeColumnIndexer;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.CursorBuildSpec;
@@ -51,7 +48,6 @@ import org.apache.druid.segment.DimensionIndexer;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.Metadata;
 import org.apache.druid.segment.column.ColumnCapabilities;
-import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.projections.Projections;
 import org.apache.druid.segment.projections.QueryableProjection;
@@ -61,7 +57,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Deque;
@@ -188,62 +183,17 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
       // initialize them all with 0 rows
       AggregateProjectionMetadata.Schema schema = 
projectionSpec.toMetadataSchema();
       aggregateProjections.add(new AggregateProjectionMetadata(schema, 0));
-      final List<DimensionDesc> descs = new ArrayList<>();
-      // mapping of position in descs on the projection to position in the 
parent incremental index. Like the parent
-      // incremental index, the time (or time-like) column does not have a 
dimension descriptor and is specially
-      // handled as the timestamp of the row. Unlike the parent incremental 
index, an aggregating projection will
-      // always have its time-like column in the grouping columns list, so its 
position in this array specifies -1
-      final int[] parentDimIndex = new 
int[projectionSpec.getGroupingColumns().size()];
-      Arrays.fill(parentDimIndex, -1);
-      int i = 0;
-      final Map<String, DimensionDesc> dimensionsMap = new HashMap<>();
-      for (DimensionSchema dimension : projectionSpec.getGroupingColumns()) {
-        if (dimension.getName().equals(schema.getTimeColumnName())) {
-          continue;
-        }
-        final DimensionDesc parent = getDimension(dimension.getName());
-        if (parent == null) {
-          // this dimension only exists in the child, it needs its own handler
-          final DimensionDesc childOnly = new DimensionDesc(
-              i++,
-              dimension.getName(),
-              dimension.getDimensionHandler(),
-              useMaxMemoryEstimates
-          );
-          descs.add(childOnly);
-          dimensionsMap.put(dimension.getName(), childOnly);
-        } else {
-          if 
(!dimension.getColumnType().equals(parent.getCapabilities().toColumnType())) {
-            // special handle auto column schema, who reports type as json in 
schema, but indexer reports whatever
-            // type it has seen, which is string at this stage
-            boolean allowAuto = 
ColumnType.NESTED_DATA.equals(dimension.getColumnType()) &&
-                                parent.getIndexer() instanceof 
AutoTypeColumnIndexer;
-            InvalidInput.conditionalException(
-                allowAuto,
-                "projection[%s] contains dimension[%s] with different type[%s] 
than type[%s] in base table",
-                projectionSpec.getName(),
-                dimension.getName(),
-                dimension.getColumnType(),
-                parent.getCapabilities().toColumnType()
-            );
-          }
-          // make a new DimensionDesc from the child, containing all of the 
parents stuff but with the childs position
-          final DimensionDesc child = new DimensionDesc(
-              i++,
-              parent.getName(),
-              parent.getHandler(),
-              parent.getIndexer()
-          );
-          descs.add(child);
-          dimensionsMap.put(dimension.getName(), child);
-          parentDimIndex[child.getIndex()] = parent.getIndex();
-        }
-      }
+
       final OnHeapAggregateProjection projection = new 
OnHeapAggregateProjection(
-          projectionSpec.toMetadataSchema(),
-          descs,
-          dimensionsMap,
-          parentDimIndex,
+          projectionSpec,
+          this::getDimension,
+          metric -> {
+            MetricDesc desc = getMetric(metric);
+            if (desc != null) {
+              return getMetricAggs()[desc.getIndex()];
+            }
+            return null;
+          },
           incrementalIndexSchema.getMinTimestamp(),
           this.useMaxMemoryEstimates,
           this.maxBytesPerRowForAggregators
@@ -368,7 +318,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
     final int priorIndex = facts.getPriorIndex(key);
 
     Aggregator[] aggs;
-    final AggregatorFactory[] metrics = getMetrics();
+    final AggregatorFactory[] metrics = getMetricAggs();
     final AtomicInteger numEntries = getNumEntries();
     if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
       aggs = aggregators.get(priorIndex);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
 
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
index c87733d68e4..ce26f322cf4 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
@@ -179,6 +179,12 @@ public class Projections
       return this;
     }
 
+    @Nullable
+    public String getRemapValue(String queryColumn)
+    {
+      return remapColumns.get(queryColumn);
+    }
+
     /**
      * Add a projection physical column, which will later be added to {@link 
ProjectionMatch#getCursorBuildSpec()} if
      * the projection matches
diff --git 
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
 
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index 403ae7058ba..7bc570a6ed6 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -102,61 +102,69 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
   static final DateTime TIMESTAMP = 
Granularities.DAY.bucket(DateTimes.nowUtc()).getStart();
 
   static final RowSignature ROW_SIGNATURE = RowSignature.builder()
-                                                                .add("a", 
ColumnType.STRING)
-                                                                .add("b", 
ColumnType.STRING)
-                                                                .add("c", 
ColumnType.LONG)
-                                                                .add("d", 
ColumnType.DOUBLE)
-                                                                .build();
-  static final List<InputRow> ROWS = Arrays.asList(
-      new ListBasedInputRow(
-          ROW_SIGNATURE,
-          TIMESTAMP,
-          ROW_SIGNATURE.getColumnNames(),
-          Arrays.asList("a", "aa", 1L, 1.0)
-      ),
-      new ListBasedInputRow(
-          ROW_SIGNATURE,
-          TIMESTAMP.plusMinutes(2),
-          ROW_SIGNATURE.getColumnNames(),
-          Arrays.asList("a", "bb", 1L, 1.1, 1.1f)
-      ),
-      new ListBasedInputRow(
-          ROW_SIGNATURE,
-          TIMESTAMP.plusMinutes(4),
-          ROW_SIGNATURE.getColumnNames(),
-          Arrays.asList("a", "cc", 2L, 2.2, 2.2f)
-      ),
-      new ListBasedInputRow(
-          ROW_SIGNATURE,
-          TIMESTAMP.plusMinutes(6),
-          ROW_SIGNATURE.getColumnNames(),
-          Arrays.asList("b", "aa", 3L, 3.3, 3.3f)
-      ),
-      new ListBasedInputRow(
-          ROW_SIGNATURE,
-          TIMESTAMP.plusMinutes(8),
-          ROW_SIGNATURE.getColumnNames(),
-          Arrays.asList("b", "aa", 4L, 4.4, 4.4f)
-      ),
-      new ListBasedInputRow(
-          ROW_SIGNATURE,
-          TIMESTAMP.plusMinutes(10),
-          ROW_SIGNATURE.getColumnNames(),
-          Arrays.asList("b", "bb", 5L, 5.5, 5.5f)
-      ),
-      new ListBasedInputRow(
-          ROW_SIGNATURE,
-          TIMESTAMP.plusHours(1),
-          ROW_SIGNATURE.getColumnNames(),
-          Arrays.asList("a", "aa", 1L, 1.1, 1.1f)
-      ),
-      new ListBasedInputRow(
-          ROW_SIGNATURE,
-          TIMESTAMP.plusHours(1).plusMinutes(1),
-          ROW_SIGNATURE.getColumnNames(),
-          Arrays.asList("a", "dd", 2L, 2.2, 2.2f)
-      )
-  );
+                                                        .add("a", 
ColumnType.STRING)
+                                                        .add("b", 
ColumnType.STRING)
+                                                        .add("c", 
ColumnType.LONG)
+                                                        .add("d", 
ColumnType.DOUBLE)
+                                                        .add("e", 
ColumnType.FLOAT)
+                                                        .build();
+
+  public static List<InputRow> makeRows(List<String> dimensions)
+  {
+    return Arrays.asList(
+        new ListBasedInputRow(
+            ROW_SIGNATURE,
+            TIMESTAMP,
+            dimensions,
+            Arrays.asList("a", "aa", 1L, 1.0)
+        ),
+        new ListBasedInputRow(
+            ROW_SIGNATURE,
+            TIMESTAMP.plusMinutes(2),
+            dimensions,
+            Arrays.asList("a", "bb", 1L, 1.1, 1.1f)
+        ),
+        new ListBasedInputRow(
+            ROW_SIGNATURE,
+            TIMESTAMP.plusMinutes(4),
+            dimensions,
+            Arrays.asList("a", "cc", 2L, 2.2, 2.2f)
+        ),
+        new ListBasedInputRow(
+            ROW_SIGNATURE,
+            TIMESTAMP.plusMinutes(6),
+            dimensions,
+            Arrays.asList("b", "aa", 3L, 3.3, 3.3f)
+        ),
+        new ListBasedInputRow(
+            ROW_SIGNATURE,
+            TIMESTAMP.plusMinutes(8),
+            dimensions,
+            Arrays.asList("b", "aa", 4L, 4.4, 4.4f)
+        ),
+        new ListBasedInputRow(
+            ROW_SIGNATURE,
+            TIMESTAMP.plusMinutes(10),
+            dimensions,
+            Arrays.asList("b", "bb", 5L, 5.5, 5.5f)
+        ),
+        new ListBasedInputRow(
+            ROW_SIGNATURE,
+            TIMESTAMP.plusHours(1),
+            dimensions,
+            Arrays.asList("a", "aa", 1L, 1.1, 1.1f)
+        ),
+        new ListBasedInputRow(
+            ROW_SIGNATURE,
+            TIMESTAMP.plusHours(1).plusMinutes(1),
+            dimensions,
+            Arrays.asList("a", "dd", 2L, 2.2, 2.2f)
+        )
+    );
+  }
+
+  static final List<InputRow> ROWS = makeRows(ROW_SIGNATURE.getColumnNames());
+  static final List<InputRow> ROLLUP_ROWS = makeRows(ImmutableList.of("a", 
"b"));
 
   private static final List<AggregateProjectionSpec> PROJECTIONS = 
Arrays.asList(
       new AggregateProjectionSpec(
@@ -246,17 +254,65 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
       )
   );
 
-  private static final List<AggregateProjectionSpec> AUTO_PROJECTIONS = 
PROJECTIONS.stream().map(projection -> {
-    return new AggregateProjectionSpec(
-        projection.getName(),
-        projection.getVirtualColumns(),
-        projection.getGroupingColumns()
-                  .stream()
-                  .map(x -> new AutoTypeColumnSchema(x.getName(), null))
-                  .collect(Collectors.toList()),
-        projection.getAggregators()
-    );
-  }).collect(Collectors.toList());
+  private static final List<AggregateProjectionSpec> ROLLUP_PROJECTIONS = 
Arrays.asList(
+      new AggregateProjectionSpec(
+          "a_hourly_c_sum_with_count",
+          VirtualColumns.create(
+              Granularities.toVirtualColumn(Granularities.HOUR, "__gran")
+          ),
+          Arrays.asList(
+              new LongDimensionSchema("__gran"),
+              new StringDimensionSchema("a")
+          ),
+          new AggregatorFactory[]{
+              new CountAggregatorFactory("chocula"),
+              new LongSumAggregatorFactory("sum_c", "sum_c")
+          }
+      ),
+      new AggregateProjectionSpec(
+          "afoo_daily",
+          VirtualColumns.create(
+              new ExpressionVirtualColumn(
+                  "afoo",
+                  "concat(a, 'foo')",
+                  ColumnType.STRING,
+                  TestExprMacroTable.INSTANCE
+              )
+          ),
+          List.of(
+              new StringDimensionSchema("afoo")
+          ),
+          new AggregatorFactory[]{
+              new LongSumAggregatorFactory("sum_c", "sum_c")
+          }
+      )
+  );
+
+  private static final List<AggregateProjectionSpec> AUTO_PROJECTIONS =
+      PROJECTIONS.stream()
+                 .map(projection -> new AggregateProjectionSpec(
+                     projection.getName(),
+                     projection.getVirtualColumns(),
+                     projection.getGroupingColumns()
+                               .stream()
+                               .map(x -> new AutoTypeColumnSchema(x.getName(), 
null))
+                               .collect(Collectors.toList()),
+                     projection.getAggregators()
+                 ))
+                 .collect(Collectors.toList());
+
+  private static final List<AggregateProjectionSpec> AUTO_ROLLUP_PROJECTIONS =
+      ROLLUP_PROJECTIONS.stream()
+                        .map(projection -> new AggregateProjectionSpec(
+                            projection.getName(),
+                            projection.getVirtualColumns(),
+                            projection.getGroupingColumns()
+                                      .stream()
+                                      .map(x -> new 
AutoTypeColumnSchema(x.getName(), null))
+                                      .collect(Collectors.toList()),
+                            projection.getAggregators()
+                        ))
+                        .collect(Collectors.toList());
 
   @Parameterized.Parameters(name = "name: {0}, sortByDim: {3}, autoSchema: 
{4}")
   public static Collection<?> constructorFeeder()
@@ -273,46 +329,85 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
                               new FloatDimensionSchema("e")
                           )
                       );
+
+    final DimensionsSpec.Builder rollupDimensionsBuilder =
+        DimensionsSpec.builder()
+                      .setDimensions(
+                          Arrays.asList(
+                              new StringDimensionSchema("a"),
+                              new StringDimensionSchema("b")
+                          )
+                      );
+    final AggregatorFactory[] rollupAggs = new AggregatorFactory[]{
+        new LongSumAggregatorFactory("sum_c", "c"),
+        new DoubleSumAggregatorFactory("sum_d", "d"),
+        new FloatSumAggregatorFactory("sum_e", "e")
+    };
+
     DimensionsSpec dimsTimeOrdered = dimensionsBuilder.build();
     DimensionsSpec dimsOrdered = 
dimensionsBuilder.setForceSegmentSortByTime(false).build();
 
+    DimensionsSpec rollupDimsTimeOrdered = rollupDimensionsBuilder.build();
+    DimensionsSpec rollupDimsOrdered = 
rollupDimensionsBuilder.setForceSegmentSortByTime(false).build();
+
 
     List<DimensionSchema> autoDims = dimsOrdered.getDimensions()
                                                 .stream()
                                                 .map(x -> new 
AutoTypeColumnSchema(x.getName(), null))
                                                 .collect(Collectors.toList());
+
+    List<DimensionSchema> rollupAutoDims = rollupDimsOrdered.getDimensions()
+                                                            .stream()
+                                                            .map(x -> new 
AutoTypeColumnSchema(x.getName(), null))
+                                                            
.collect(Collectors.toList());
+
     for (boolean incremental : new boolean[]{true, false}) {
-      for (boolean sortByDim : new boolean[]{true, false}) {
-        for (boolean autoSchema : new boolean[]{true, false}) {
+      for (boolean sortByDim : new boolean[]{/*true,*/ false}) {
+        for (boolean autoSchema : new boolean[]{/*true,*/ false}) {
           final DimensionsSpec dims;
+          final DimensionsSpec rollupDims;
           if (sortByDim) {
             if (autoSchema) {
               dims = dimsOrdered.withDimensions(autoDims);
+              rollupDims = rollupDimsOrdered.withDimensions(rollupAutoDims);
             } else {
               dims = dimsOrdered;
+              rollupDims = rollupDimsOrdered;
             }
           } else {
             if (autoSchema) {
               dims = dimsTimeOrdered.withDimensions(autoDims);
+              rollupDims = rollupDimsTimeOrdered.withDimensions(autoDims);
             } else {
               dims = dimsTimeOrdered;
+              rollupDims = rollupDimsTimeOrdered;
             }
           }
           if (incremental) {
             IncrementalIndex index = CLOSER.register(makeBuilder(dims, 
autoSchema).buildIncrementalIndex());
+            IncrementalIndex rollupIndex = CLOSER.register(
+                makeRollupBuilder(rollupDims, rollupAggs, 
autoSchema).buildIncrementalIndex()
+            );
             constructors.add(new Object[]{
                 "incrementalIndex",
                 new IncrementalIndexCursorFactory(index),
                 new IncrementalIndexTimeBoundaryInspector(index),
+                new IncrementalIndexCursorFactory(rollupIndex),
+                new IncrementalIndexTimeBoundaryInspector(rollupIndex),
                 sortByDim,
                 autoSchema
             });
           } else {
             QueryableIndex index = CLOSER.register(makeBuilder(dims, 
autoSchema).buildMMappedIndex());
+            QueryableIndex rollupIndex = CLOSER.register(
+                makeRollupBuilder(rollupDims, rollupAggs, 
autoSchema).buildMMappedIndex()
+            );
             constructors.add(new Object[]{
                 "queryableIndex",
                 new QueryableIndexCursorFactory(index),
                 QueryableIndexTimeBoundaryInspector.create(index),
+                new QueryableIndexCursorFactory(rollupIndex),
+                QueryableIndexTimeBoundaryInspector.create(rollupIndex),
                 sortByDim,
                 autoSchema
             });
@@ -332,6 +427,8 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
 
   public final CursorFactory projectionsCursorFactory;
   public final TimeBoundaryInspector projectionsTimeBoundaryInspector;
+  public final CursorFactory rollupProjectionsCursorFactory;
+  public final TimeBoundaryInspector rollupProjectionsTimeBoundaryInspector;
 
   private final GroupingEngine groupingEngine;
   private final TimeseriesQueryEngine timeseriesEngine;
@@ -347,12 +444,16 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
       String name,
       CursorFactory projectionsCursorFactory,
       TimeBoundaryInspector projectionsTimeBoundaryInspector,
+      CursorFactory rollupProjectionsCursorFactory,
+      TimeBoundaryInspector rollupProjectionsTimeBoundaryInspector,
       boolean sortByDim,
       boolean autoSchema
   )
   {
     this.projectionsCursorFactory = projectionsCursorFactory;
     this.projectionsTimeBoundaryInspector = projectionsTimeBoundaryInspector;
+    this.rollupProjectionsCursorFactory = rollupProjectionsCursorFactory;
+    this.rollupProjectionsTimeBoundaryInspector = 
rollupProjectionsTimeBoundaryInspector;
     this.sortByDim = sortByDim;
     this.autoSchema = autoSchema;
     this.nonBlockingPool = closer.closeLater(
@@ -955,7 +1056,7 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
   @Test
   public void testProjectionSelectionMissingAggregatorButHasAggregatorInput()
   {
-    // d is present as a column on the projection, but since its an aggregate 
projection we cannot use it
+    // e is present as a column on the projection, but since its an aggregate 
projection we cannot use it
     final GroupByQuery query =
         GroupByQuery.builder()
                     .setDataSource("test")
@@ -986,10 +1087,10 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
 
     final List<ResultRow> results = resultRows.toList();
     Assert.assertEquals(4, results.size());
-    Assert.assertArrayEquals(new Object[]{"aa", 9L, null}, 
results.get(0).getArray());
-    Assert.assertArrayEquals(new Object[]{"bb", 6L, null}, 
results.get(1).getArray());
-    Assert.assertArrayEquals(new Object[]{"cc", 2L, null}, 
results.get(2).getArray());
-    Assert.assertArrayEquals(new Object[]{"dd", 2L, null}, 
results.get(3).getArray());
+    Assert.assertArrayEquals(new Object[]{"aa", 9L, 8.8f}, 
results.get(0).getArray());
+    Assert.assertArrayEquals(new Object[]{"bb", 6L, 6.6f}, 
results.get(1).getArray());
+    Assert.assertArrayEquals(new Object[]{"cc", 2L, 2.2f}, 
results.get(2).getArray());
+    Assert.assertArrayEquals(new Object[]{"dd", 2L, 2.2f}, 
results.get(3).getArray());
   }
 
   @Test
@@ -1217,6 +1318,88 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
     Assert.assertArrayEquals(new 
Object[]{TIMESTAMP.plusHours(1).plusMinutes(1), 2L}, 
getResultArray(results.get(7), querySignature));
   }
 
+  @Test
+  public void testProjectionSingleDimRollupTable()
+  {
+    // test can use the single dimension projection
+    final GroupByQuery query =
+        GroupByQuery.builder()
+                    .setDataSource("test")
+                    .setGranularity(Granularities.ALL)
+                    .setInterval(Intervals.ETERNITY)
+                    .addDimension("a")
+                    .addAggregator(new LongSumAggregatorFactory("c_sum", 
"sum_c"))
+                    .build();
+    final CursorBuildSpec buildSpec = 
GroupingEngine.makeCursorBuildSpec(query, null);
+    try (final CursorHolder cursorHolder = 
rollupProjectionsCursorFactory.makeCursorHolder(buildSpec)) {
+      final Cursor cursor = cursorHolder.asCursor();
+      int rowCount = 0;
+      while (!cursor.isDone()) {
+        rowCount++;
+        cursor.advance();
+      }
+      Assert.assertEquals(3, rowCount);
+    }
+    final Sequence<ResultRow> resultRows = groupingEngine.process(
+        query,
+        rollupProjectionsCursorFactory,
+        rollupProjectionsTimeBoundaryInspector,
+        nonBlockingPool,
+        null
+    );
+    final List<ResultRow> results = resultRows.toList();
+    Assert.assertEquals(2, results.size());
+    Assert.assertArrayEquals(
+        new Object[]{"a", 7L},
+        results.get(0).getArray()
+    );
+    Assert.assertArrayEquals(
+        new Object[]{"b", 12L},
+        results.get(1).getArray()
+    );
+  }
+
+  @Test
+  public void testProjectionSingleDimVirtualColumnRollupTable()
+  {
+    final GroupByQuery query =
+        GroupByQuery.builder()
+                    .setDataSource("test")
+                    .setGranularity(Granularities.ALL)
+                    .setInterval(Intervals.ETERNITY)
+                    .addDimension("v0")
+                    .setVirtualColumns(new ExpressionVirtualColumn("v0", 
"concat(a, 'foo')", ColumnType.STRING, TestExprMacroTable.INSTANCE))
+                    .addAggregator(new LongSumAggregatorFactory("c_sum", 
"sum_c"))
+                    .build();
+    final CursorBuildSpec buildSpec = 
GroupingEngine.makeCursorBuildSpec(query, null);
+    try (final CursorHolder cursorHolder = 
rollupProjectionsCursorFactory.makeCursorHolder(buildSpec)) {
+      final Cursor cursor = cursorHolder.asCursor();
+      int rowCount = 0;
+      while (!cursor.isDone()) {
+        rowCount++;
+        cursor.advance();
+      }
+      Assert.assertEquals(2, rowCount);
+    }
+    final Sequence<ResultRow> resultRows = groupingEngine.process(
+        query,
+        rollupProjectionsCursorFactory,
+        rollupProjectionsTimeBoundaryInspector,
+        nonBlockingPool,
+        null
+    );
+    final List<ResultRow> results = resultRows.toList();
+    Assert.assertEquals(2, results.size());
+    Assert.assertArrayEquals(
+        new Object[]{"afoo", 7L},
+        results.get(0).getArray()
+    );
+    Assert.assertArrayEquals(
+        new Object[]{"bfoo", 12L},
+        results.get(1).getArray()
+    );
+  }
+
   private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, 
boolean autoSchema)
   {
     File tmp = FileUtils.createTempDir();
@@ -1234,6 +1417,24 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
                        .rows(ROWS);
   }
 
+  private static IndexBuilder makeRollupBuilder(DimensionsSpec dimensionsSpec, 
AggregatorFactory[] aggs, boolean autoSchema)
+  {
+    File tmp = FileUtils.createTempDir();
+    CLOSER.register(tmp::delete);
+    return IndexBuilder.create()
+                       .tmpDir(tmp)
+                       .schema(
+                           IncrementalIndexSchema.builder()
+                                                 
.withDimensionsSpec(dimensionsSpec)
+                                                 .withMetrics(aggs)
+                                                 .withRollup(true)
+                                                 
.withMinTimestamp(TIMESTAMP.getMillis())
+                                                 .withProjections(autoSchema ? 
AUTO_ROLLUP_PROJECTIONS : ROLLUP_PROJECTIONS)
+                                                 .build()
+                       )
+                       .rows(ROLLUP_ROWS);
+  }
+
   private static Set<Object[]> makeArrayResultSet()
   {
     Set<Object[]> resultsInNoParticularOrder = new ObjectOpenCustomHashSet<>(
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
index 4b76fa8dd2b..8b27b52b030 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
@@ -3005,7 +3005,8 @@ public abstract class IndexMergerTestBase extends 
InitializedNullHandlingTest
                       .setDimensions(
                           Arrays.asList(
                               new StringDimensionSchema("a"),
-                              new StringDimensionSchema("b")
+                              new StringDimensionSchema("b"),
+                              new LongDimensionSchema("c")
                           )
                       );
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
index 07ed6f7502f..ff01b8f989c 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
@@ -21,8 +21,22 @@ package org.apache.druid.segment.incremental;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
 import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,11 +45,12 @@ public class OnheapIncrementalIndexTest
   private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
 
   @Test
-  public void testSerde() throws JsonProcessingException
+  public void testSpecSerde() throws JsonProcessingException
   {
     OnheapIncrementalIndex.Spec spec = new OnheapIncrementalIndex.Spec(true);
     Assert.assertEquals(spec, 
MAPPER.readValue(MAPPER.writeValueAsString(spec), 
OnheapIncrementalIndex.Spec.class));
   }
+
   @Test
   public void testSpecEqualsAndHashCode()
   {
@@ -43,4 +58,233 @@ public class OnheapIncrementalIndexTest
                   .usingGetClass()
                   .verify();
   }
+
+  @Test
+  public void testBadProjectionMismatchedDimensionTypes()
+  {
+    Throwable t = Assert.assertThrows(
+        DruidException.class,
+        () ->
+            IndexBuilder.create()
+                        .schema(
+                            IncrementalIndexSchema.builder()
+                                                  .withDimensionsSpec(
+                                                      DimensionsSpec.builder()
+                                                                    
.setDimensions(
+                                                                        
ImmutableList.of(
+                                                                            
new StringDimensionSchema("string"),
+                                                                            
new LongDimensionSchema("long")
+                                                                        )
+                                                                    )
+                                                                    .build()
+                                                  )
+                                                  .withProjections(
+                                                      ImmutableList.of(
+                                                          new 
AggregateProjectionSpec(
+                                                              "mismatched 
dims",
+                                                              
VirtualColumns.EMPTY,
+                                                              ImmutableList.of(
+                                                                  new 
LongDimensionSchema("string")
+                                                              ),
+                                                              null
+                                                          )
+                                                      )
+                                                  )
+                                                  .build()
+                        ).buildIncrementalIndex()
+    );
+    Assert.assertEquals(
+        "projection[mismatched dims] contains dimension[string] with different 
type[LONG] than type[STRING] in base table",
+        t.getMessage()
+    );
+  }
+
+  @Test
+  public void testBadProjectionVirtualColumnNoDimension()
+  {
+    Throwable t = Assert.assertThrows(
+        DruidException.class,
+        () ->
+            IndexBuilder.create()
+                        .schema(
+                            IncrementalIndexSchema.builder()
+                                                  .withDimensionsSpec(
+                                                      DimensionsSpec.builder()
+                                                                    
.setDimensions(
+                                                                        
ImmutableList.of(
+                                                                            
new StringDimensionSchema("string"),
+                                                                            
new LongDimensionSchema("long")
+                                                                        )
+                                                                    )
+                                                                    .build()
+                                                  )
+                                                  .withProjections(
+                                                      ImmutableList.of(
+                                                          new 
AggregateProjectionSpec(
+                                                              "sad virtual 
column",
+                                                              
VirtualColumns.create(
+                                                                  new 
ExpressionVirtualColumn(
+                                                                      "v0",
+                                                                      "double",
+                                                                      
ColumnType.DOUBLE,
+                                                                      
TestExprMacroTable.INSTANCE
+                                                                  )
+                                                              ),
+                                                              ImmutableList.of(
+                                                                  new 
LongDimensionSchema("long")
+                                                              ),
+                                                              null
+                                                          )
+                                                      )
+                                                  )
+                                                  .build()
+                        ).buildIncrementalIndex()
+    );
+    Assert.assertEquals(
+        "projection[sad virtual column] contains virtual column[v0] that 
references an input[double] which is not a dimension in the base table",
+        t.getMessage()
+    );
+  }
+
+  @Test
+  public void testBadProjectionRollupMismatchedAggType()
+  {
+    Throwable t = Assert.assertThrows(
+        DruidException.class,
+        () ->
+            IndexBuilder.create()
+                        .schema(
+                            IncrementalIndexSchema.builder()
+                                                  .withDimensionsSpec(
+                                                      DimensionsSpec.builder()
+                                                                    
.setDimensions(
+                                                                        
ImmutableList.of(
+                                                                            
new StringDimensionSchema("string"),
+                                                                            
new LongDimensionSchema("long")
+                                                                        )
+                                                                    )
+                                                                    .build()
+                                                  )
+                                                  .withRollup(true)
+                                                  .withMetrics(
+                                                      new 
DoubleSumAggregatorFactory("sum_double", "sum_double")
+                                                  )
+                                                  .withProjections(
+                                                      ImmutableList.of(
+                                                          new 
AggregateProjectionSpec(
+                                                              "mismatched agg",
+                                                              
VirtualColumns.EMPTY,
+                                                              ImmutableList.of(
+                                                                  new 
StringDimensionSchema("string")
+                                                              ),
+                                                              new 
AggregatorFactory[] {
+                                                                  new 
LongSumAggregatorFactory("sum_double", "sum_double")
+                                                              }
+                                                          )
+                                                      )
+                                                  )
+                                                  .build()
+                        ).buildIncrementalIndex()
+    );
+    Assert.assertEquals(
+        "projection[mismatched agg] contains aggregator[sum_double] that is 
not the 'combining' aggregator of base table aggregator[sum_double]",
+        t.getMessage()
+    );
+  }
+
+  @Test
+  public void testBadProjectionRollupBadAggInput()
+  {
+    Throwable t = Assert.assertThrows(
+        DruidException.class,
+        () ->
+            IndexBuilder.create()
+                        .schema(
+                            IncrementalIndexSchema.builder()
+                                                  .withDimensionsSpec(
+                                                      DimensionsSpec.builder()
+                                                                    
.setDimensions(
+                                                                        
ImmutableList.of(
+                                                                            
new StringDimensionSchema("string"),
+                                                                            
new LongDimensionSchema("long")
+                                                                        )
+                                                                    )
+                                                                    .build()
+                                                  )
+                                                  .withRollup(true)
+                                                  .withMetrics(
+                                                      new 
DoubleSumAggregatorFactory("double", "double")
+                                                  )
+                                                  .withProjections(
+                                                      ImmutableList.of(
+                                                          new 
AggregateProjectionSpec(
+                                                              "renamed agg",
+                                                              
VirtualColumns.EMPTY,
+                                                              ImmutableList.of(
+                                                                  new 
StringDimensionSchema("string")
+                                                              ),
+                                                              new 
AggregatorFactory[] {
+                                                                  new 
LongSumAggregatorFactory("sum_long", "long"),
+                                                                  new 
DoubleSumAggregatorFactory("sum_double", "double")
+                                                              }
+                                                          )
+                                                      )
+                                                  )
+                                                  .build()
+                        ).buildIncrementalIndex()
+    );
+    Assert.assertEquals(
+        "projection[renamed agg] contains aggregator[sum_double] that 
references aggregator[double] in base table but this is not supported, 
projection aggregators which reference base table aggregates must be 
'combining' aggregators with the same name as the base table column",
+        t.getMessage()
+    );
+  }
+
+  @Test
+  public void testBadProjectionVirtualColumnAggInput()
+  {
+    Throwable t = Assert.assertThrows(
+        DruidException.class,
+        () ->
+            IndexBuilder.create()
+                        .schema(
+                            IncrementalIndexSchema.builder()
+                                                  .withDimensionsSpec(
+                                                      DimensionsSpec.builder()
+                                                                    
.setDimensions(
+                                                                        
ImmutableList.of(
+                                                                            
new StringDimensionSchema("string"),
+                                                                            
new LongDimensionSchema("long")
+                                                                        )
+                                                                    )
+                                                                    .build()
+                                                  )
+                                                  .withProjections(
+                                                      ImmutableList.of(
+                                                          new 
AggregateProjectionSpec(
+                                                              "sad agg virtual 
column",
+                                                              
VirtualColumns.create(
+                                                                  new 
ExpressionVirtualColumn(
+                                                                      "v0",
+                                                                      "long + 
100",
+                                                                      
ColumnType.LONG,
+                                                                      
TestExprMacroTable.INSTANCE
+                                                                  )
+                                                              ),
+                                                              ImmutableList.of(
+                                                                  new 
LongDimensionSchema("long")
+                                                              ),
+                                                              new 
AggregatorFactory[] {
+                                                                  new 
LongSumAggregatorFactory("v0_sum", "v0")
+                                                              }
+                                                          )
+                                                      )
+                                                  )
+                                                  .build()
+                        ).buildIncrementalIndex()
+    );
+    Assert.assertEquals(
+        "projection[sad agg virtual column] contains aggregator[v0_sum] that 
is has required field[v0] which is a virtual column, this is not yet supported",
+        t.getMessage()
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to