This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch 33.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/33.0.0 by this push:
new f76077af7fc [Backport] fixes and validation for projections with
rollup base tables (#17866) (#17869)
f76077af7fc is described below
commit f76077af7fc3b6fbf5557d349b56cfb67dd2ac79
Author: Karan Kumar <[email protected]>
AuthorDate: Fri Apr 4 10:14:51 2025 +0530
[Backport] fixes and validation for projections with rollup base tables
(#17866) (#17869)
* fixes and validation for projections with rollup base tables
changes:
* Moved projection dimension initialization into
`OnHeapAggregateProjection` instead of being done externally
* Fix projection aggregations to work properly for rollup base tables with
the same aggregations (swap out projection agg with base table agg to build the
projection from the raw input row)
* Added validation for projection aggregations to ensure they either
reference a base table aggregation (being a combining agg of some base table
agg) or they reference a base table dimension(s)
* Added validation for projection virtual columns to ensure they reference
a base table dimension
* Tests for rollup projections
* Tests for `OnHeapAggregationProjection` validations
(cherry picked from commit 30b7bf0c9b17b1087c0ecbac85ecf5aeb3ae99dc)
Co-authored-by: Clint Wylie <[email protected]>
---
.../druid/segment/AggregateProjectionMetadata.java | 20 ++
.../segment/incremental/IncrementalIndex.java | 5 -
.../incremental/OnHeapAggregateProjection.java | 230 +++++++++++---
.../incremental/OnheapIncrementalIndex.java | 72 +----
.../druid/segment/projections/Projections.java | 6 +
.../druid/segment/CursorFactoryProjectionTest.java | 347 ++++++++++++++++-----
.../apache/druid/segment/IndexMergerTestBase.java | 3 +-
.../incremental/OnheapIncrementalIndexTest.java | 246 ++++++++++++++-
8 files changed, 749 insertions(+), 180 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
index 0b54a80e8d4..7e9aec5e808 100644
---
a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
+++
b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
@@ -319,6 +319,10 @@ public class AggregateProjectionMetadata
if (matchBuilder == null) {
return null;
}
+ // a query grouping column must also be defined as a projection
grouping column
+ if (isInvalidGrouping(queryColumn) ||
isInvalidGrouping(matchBuilder.getRemapValue(queryColumn))) {
+ return null;
+ }
}
}
if (queryCursorBuildSpec.getFilter() != null) {
@@ -444,6 +448,22 @@ public class AggregateProjectionMetadata
}
}
+ /**
+ * Check if a column is either part of {@link #groupingColumns}, or at
least is not present in
+ * {@link #virtualColumns}. Naively we would just check that grouping
column contains the column in question,
+ * however we can also use a projection when a column is truly missing.
{@link #matchRequiredColumn} returns a
+ * match builder if the column is present as either a physical column, or
a virtual column, but a virtual column
+ * could also be present for an aggregator input, so we must further check
that a column not in the grouping list
+ * is also not a virtual column, the implication being that it is a
missing column.
+ */
+ private boolean isInvalidGrouping(@Nullable String columnName)
+ {
+ if (columnName == null) {
+ return false;
+ }
+ return !groupingColumns.contains(columnName) &&
virtualColumns.exists(columnName);
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index a0729915454..62a9541b5e9 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -690,11 +690,6 @@ public abstract class IncrementalIndex implements
IncrementalIndexRowSelector, C
return numEntries;
}
- AggregatorFactory[] getMetrics()
- {
- return metrics;
- }
-
public AtomicLong getBytesInMemory()
{
return bytesInMemory;
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
b/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
index 75650a1c182..ff5b8563f70 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
@@ -21,15 +21,20 @@ package org.apache.druid.segment.incremental;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.AggregateProjectionMetadata;
+import org.apache.druid.segment.AutoTypeColumnIndexer;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.EncodedKeyComponent;
+import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.CapabilitiesBasedFormat;
import org.apache.druid.segment.column.ColumnCapabilities;
@@ -40,6 +45,9 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -47,6 +55,7 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
/**
* Projection of {@link OnheapIncrementalIndex} for {@link
org.apache.druid.data.input.impl.AggregateProjectionSpec}
@@ -72,20 +81,31 @@ public class OnHeapAggregateProjection implements
IncrementalIndexRowSelector
private final AtomicInteger numEntries = new AtomicInteger(0);
public OnHeapAggregateProjection(
- AggregateProjectionMetadata.Schema schema,
- List<IncrementalIndex.DimensionDesc> dimensions,
- Map<String, IncrementalIndex.DimensionDesc> dimensionsMap,
- int[] parentDimensionIndex,
+ AggregateProjectionSpec projectionSpec,
+ Function<String, IncrementalIndex.DimensionDesc>
getBaseTableDimensionDesc,
+ Function<String, AggregatorFactory> getBaseTableAggregatorFactory,
long minTimestamp,
boolean useMaxMemoryEstimates,
long maxBytesPerRowForAggregators
)
{
- this.projectionSchema = schema;
- this.dimensions = dimensions;
- this.parentDimensionIndex = parentDimensionIndex;
- this.dimensionsMap = dimensionsMap;
+ this.projectionSchema = projectionSpec.toMetadataSchema();
this.minTimestamp = minTimestamp;
+ this.useMaxMemoryEstimates = useMaxMemoryEstimates;
+ this.maxBytesPerRowForAggregators = maxBytesPerRowForAggregators;
+
+ // initialize dimensions, facts holder
+ this.dimensions = new ArrayList<>();
+ // mapping of position in descs on the projection to position in the
parent incremental index. Like the parent
+ // incremental index, the time (or time-like) column does not have a
dimension descriptor and is specially
+ // handled as the timestamp of the row. Unlike the parent incremental
index, an aggregating projection will
+ // always have its time-like column in the grouping columns list, so its
position in this array specifies -1
+ this.parentDimensionIndex = new
int[projectionSpec.getGroupingColumns().size()];
+ Arrays.fill(parentDimensionIndex, -1);
+ this.dimensionsMap = new HashMap<>();
+ this.columnFormats = new LinkedHashMap<>();
+
+ initializeAndValidateDimensions(projectionSpec, getBaseTableDimensionDesc,
useMaxMemoryEstimates);
final IncrementalIndex.IncrementalIndexRowComparator rowComparator = new
IncrementalIndex.IncrementalIndexRowComparator(
projectionSchema.getTimeColumnPosition() < 0 ? dimensions.size() :
projectionSchema.getTimeColumnPosition(),
dimensions
@@ -95,42 +115,17 @@ public class OnHeapAggregateProjection implements
IncrementalIndexRowSelector
dimensions,
projectionSchema.getTimeColumnPosition() == 0
);
- this.useMaxMemoryEstimates = useMaxMemoryEstimates;
- this.maxBytesPerRowForAggregators = maxBytesPerRowForAggregators;
+ // validate virtual columns refer to base table dimensions and initialize
selector factory
+ validateVirtualColumns(projectionSpec, getBaseTableDimensionDesc);
this.virtualSelectorFactory = new
OnheapIncrementalIndex.CachingColumnSelectorFactory(
- IncrementalIndex.makeColumnSelectorFactory(schema.getVirtualColumns(),
inputRowHolder, null)
+
IncrementalIndex.makeColumnSelectorFactory(projectionSchema.getVirtualColumns(),
inputRowHolder, null)
);
+ // initialize aggregators
this.aggSelectors = new LinkedHashMap<>();
this.aggregatorsMap = new LinkedHashMap<>();
- this.aggregatorFactories = new
AggregatorFactory[schema.getAggregators().length];
- this.columnFormats = new LinkedHashMap<>();
- for (IncrementalIndex.DimensionDesc dimension : dimensions) {
- if (dimension.getName().equals(projectionSchema.getTimeColumnName())) {
- columnFormats.put(
- dimension.getName(),
- new
CapabilitiesBasedFormat(ColumnCapabilitiesImpl.createDefault().setType(ColumnType.LONG))
- );
- } else {
- columnFormats.put(dimension.getName(),
dimension.getIndexer().getFormat());
- }
- }
- int i = 0;
- for (AggregatorFactory agg : schema.getAggregators()) {
- IncrementalIndex.MetricDesc metricDesc = new
IncrementalIndex.MetricDesc(aggregatorsMap.size(), agg);
- aggregatorsMap.put(metricDesc.getName(), metricDesc);
- columnFormats.put(metricDesc.getName(), new
CapabilitiesBasedFormat(metricDesc.getCapabilities()));
- final ColumnSelectorFactory factory;
- if (agg.getIntermediateType().is(ValueType.COMPLEX)) {
- factory = new OnheapIncrementalIndex.CachingColumnSelectorFactory(
- IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY,
inputRowHolder, agg)
- );
- } else {
- factory = virtualSelectorFactory;
- }
- aggSelectors.put(agg.getName(), factory);
- aggregatorFactories[i++] = agg;
- }
+ this.aggregatorFactories = new
AggregatorFactory[projectionSchema.getAggregators().length];
+ initializeAndValidateAggregators(projectionSpec,
getBaseTableDimensionDesc, getBaseTableAggregatorFactory);
}
/**
@@ -355,6 +350,163 @@ public class OnHeapAggregateProjection implements
IncrementalIndexRowSelector
return new AggregateProjectionMetadata(projectionSchema, numEntries.get());
}
+ private void validateVirtualColumns(
+ AggregateProjectionSpec projectionSpec,
+ Function<String, IncrementalIndex.DimensionDesc>
getBaseTableDimensionDesc
+ )
+ {
+ for (VirtualColumn vc :
projectionSchema.getVirtualColumns().getVirtualColumns()) {
+ for (String column : vc.requiredColumns()) {
+ if (column.equals(projectionSchema.getTimeColumnName()) ||
column.equals(ColumnHolder.TIME_COLUMN_NAME)) {
+ continue;
+ }
+ if (getBaseTableDimensionDesc.apply(column) == null) {
+ throw InvalidInput.exception(
+ "projection[%s] contains virtual column[%s] that references an
input[%s] which is not a dimension in the base table",
+ projectionSpec.getName(),
+ vc.getOutputName(),
+ column
+ );
+ }
+ }
+ }
+ }
+
+ private void initializeAndValidateDimensions(
+ AggregateProjectionSpec projectionSpec,
+ Function<String, IncrementalIndex.DimensionDesc>
getBaseTableDimensionDesc,
+ boolean useMaxMemoryEstimates
+ )
+ {
+ int i = 0;
+ for (DimensionSchema dimension : projectionSpec.getGroupingColumns()) {
+ if (dimension.getName().equals(projectionSchema.getTimeColumnName())) {
+ columnFormats.put(
+ dimension.getName(),
+ new
CapabilitiesBasedFormat(ColumnCapabilitiesImpl.createDefault().setType(ColumnType.LONG))
+ );
+ continue;
+ }
+ final IncrementalIndex.DimensionDesc parent =
getBaseTableDimensionDesc.apply(dimension.getName());
+ if (parent == null) {
+ // this dimension only exists in the child, it needs its own handler
+ final IncrementalIndex.DimensionDesc childOnly = new
IncrementalIndex.DimensionDesc(
+ i++,
+ dimension.getName(),
+ dimension.getDimensionHandler(),
+ useMaxMemoryEstimates
+ );
+
+ dimensions.add(childOnly);
+ dimensionsMap.put(dimension.getName(), childOnly);
+ columnFormats.put(dimension.getName(),
childOnly.getIndexer().getFormat());
+ } else {
+ if
(!dimension.getColumnType().equals(parent.getCapabilities().toColumnType())) {
+ // special handle auto column schema, who reports type as json in
schema, but indexer reports whatever
+ // type it has seen, which is string at this stage
+ boolean allowAuto =
ColumnType.NESTED_DATA.equals(dimension.getColumnType()) &&
+ parent.getIndexer() instanceof
AutoTypeColumnIndexer;
+ InvalidInput.conditionalException(
+ allowAuto,
+ "projection[%s] contains dimension[%s] with different type[%s]
than type[%s] in base table",
+ projectionSpec.getName(),
+ dimension.getName(),
+ dimension.getColumnType(),
+ parent.getCapabilities().toColumnType()
+ );
+ }
+ // make a new DimensionDesc from the child, containing all of the
parents stuff but with the childs position
+ final IncrementalIndex.DimensionDesc child = new
IncrementalIndex.DimensionDesc(
+ i++,
+ parent.getName(),
+ parent.getHandler(),
+ parent.getIndexer()
+ );
+
+ dimensions.add(child);
+ dimensionsMap.put(dimension.getName(), child);
+ parentDimensionIndex[child.getIndex()] = parent.getIndex();
+ columnFormats.put(dimension.getName(), child.getIndexer().getFormat());
+ }
+ }
+ }
+
+ private void initializeAndValidateAggregators(
+ AggregateProjectionSpec projectionSpec,
+ Function<String, IncrementalIndex.DimensionDesc>
getBaseTableDimensionDesc,
+ Function<String, AggregatorFactory> getBaseTableAggregatorFactory
+ )
+ {
+ int i = 0;
+ for (AggregatorFactory agg : projectionSchema.getAggregators()) {
+ AggregatorFactory aggToUse = agg;
+ AggregatorFactory baseTableAgg =
getBaseTableAggregatorFactory.apply(agg.getName());
+ if (baseTableAgg != null) {
+ // if the aggregator references a base table aggregator, it must have
the same name and be a combining aggregator
+ // of the base table agg
+ if (!agg.equals(baseTableAgg.getCombiningFactory())) {
+ throw InvalidInput.exception(
+ "projection[%s] contains aggregator[%s] that is not the
'combining' aggregator of base table aggregator[%s]",
+ projectionSpec.getName(),
+ agg.getName(),
+ agg.getName()
+ );
+ }
+ aggToUse = baseTableAgg;
+ } else {
+ // otherwise, the aggregator must reference base table dimensions
+ for (String column : agg.requiredFields()) {
+ if (column.equals(projectionSchema.getTimeColumnName()) ||
column.equals(ColumnHolder.TIME_COLUMN_NAME)) {
+ continue;
+ }
+ if (getBaseTableAggregatorFactory.apply(column) != null) {
+ throw InvalidInput.exception(
+ "projection[%s] contains aggregator[%s] that references
aggregator[%s] in base table but this is not supported, projection aggregators
which reference base table aggregates must be 'combining' aggregators with the
same name as the base table column",
+ projectionSpec.getName(),
+ agg.getName(),
+ column
+ );
+ }
+ if (getBaseTableDimensionDesc.apply(column) == null) {
+ // aggregators with virtual column inputs are not supported yet.
Supporting this requires some additional
+ // work so that there is a way to do something like rename
aggregators input column names so the projection
+ // agg which references the projection virtual column can be
changed to the query virtual column name
+ // (since the query agg references the query virtual column).
Disallow but provide a helpful error for now
+ if (projectionSchema.getVirtualColumns().exists(column)) {
+ throw InvalidInput.exception(
+ "projection[%s] contains aggregator[%s] that is has required
field[%s] which is a virtual column, this is not yet supported",
+ projectionSpec.getName(),
+ agg.getName(),
+ column
+ );
+ }
+ // not a virtual column, doesn't refer to a base table dimension
either, bail instead of ingesting a bunch
+ // of nulls
+ throw InvalidInput.exception(
+ "projection[%s] contains aggregator[%s] that is missing
required field[%s] in base table",
+ projectionSpec.getName(),
+ agg.getName(),
+ column
+ );
+ }
+ }
+ }
+ IncrementalIndex.MetricDesc metricDesc = new
IncrementalIndex.MetricDesc(aggregatorsMap.size(), aggToUse);
+ aggregatorsMap.put(metricDesc.getName(), metricDesc);
+ columnFormats.put(metricDesc.getName(), new
CapabilitiesBasedFormat(metricDesc.getCapabilities()));
+ final ColumnSelectorFactory factory;
+ if (agg.getIntermediateType().is(ValueType.COMPLEX)) {
+ factory = new OnheapIncrementalIndex.CachingColumnSelectorFactory(
+ IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY,
inputRowHolder, aggToUse)
+ );
+ } else {
+ factory = virtualSelectorFactory;
+ }
+ aggSelectors.put(aggToUse.getName(), factory);
+ aggregatorFactories[i++] = aggToUse;
+ }
+ }
+
private long factorizeAggs(AggregatorFactory[] aggregatorFactories,
Aggregator[] aggs)
{
long totalInitialSizeBytes = 0L;
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index d6c67a2fcaa..fa622175184 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -28,9 +28,7 @@ import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
-import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.error.DruidException;
-import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
@@ -42,7 +40,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.AggregateProjectionMetadata;
-import org.apache.druid.segment.AutoTypeColumnIndexer;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.CursorBuildSpec;
@@ -51,7 +48,6 @@ import org.apache.druid.segment.DimensionIndexer;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.column.ColumnCapabilities;
-import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.projections.Projections;
import org.apache.druid.segment.projections.QueryableProjection;
@@ -61,7 +57,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
@@ -188,62 +183,17 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
// initialize them all with 0 rows
AggregateProjectionMetadata.Schema schema =
projectionSpec.toMetadataSchema();
aggregateProjections.add(new AggregateProjectionMetadata(schema, 0));
- final List<DimensionDesc> descs = new ArrayList<>();
- // mapping of position in descs on the projection to position in the
parent incremental index. Like the parent
- // incremental index, the time (or time-like) column does not have a
dimension descriptor and is specially
- // handled as the timestamp of the row. Unlike the parent incremental
index, an aggregating projection will
- // always have its time-like column in the grouping columns list, so its
position in this array specifies -1
- final int[] parentDimIndex = new
int[projectionSpec.getGroupingColumns().size()];
- Arrays.fill(parentDimIndex, -1);
- int i = 0;
- final Map<String, DimensionDesc> dimensionsMap = new HashMap<>();
- for (DimensionSchema dimension : projectionSpec.getGroupingColumns()) {
- if (dimension.getName().equals(schema.getTimeColumnName())) {
- continue;
- }
- final DimensionDesc parent = getDimension(dimension.getName());
- if (parent == null) {
- // this dimension only exists in the child, it needs its own handler
- final DimensionDesc childOnly = new DimensionDesc(
- i++,
- dimension.getName(),
- dimension.getDimensionHandler(),
- useMaxMemoryEstimates
- );
- descs.add(childOnly);
- dimensionsMap.put(dimension.getName(), childOnly);
- } else {
- if
(!dimension.getColumnType().equals(parent.getCapabilities().toColumnType())) {
- // special handle auto column schema, who reports type as json in
schema, but indexer reports whatever
- // type it has seen, which is string at this stage
- boolean allowAuto =
ColumnType.NESTED_DATA.equals(dimension.getColumnType()) &&
- parent.getIndexer() instanceof
AutoTypeColumnIndexer;
- InvalidInput.conditionalException(
- allowAuto,
- "projection[%s] contains dimension[%s] with different type[%s]
than type[%s] in base table",
- projectionSpec.getName(),
- dimension.getName(),
- dimension.getColumnType(),
- parent.getCapabilities().toColumnType()
- );
- }
- // make a new DimensionDesc from the child, containing all of the
parents stuff but with the childs position
- final DimensionDesc child = new DimensionDesc(
- i++,
- parent.getName(),
- parent.getHandler(),
- parent.getIndexer()
- );
- descs.add(child);
- dimensionsMap.put(dimension.getName(), child);
- parentDimIndex[child.getIndex()] = parent.getIndex();
- }
- }
+
final OnHeapAggregateProjection projection = new
OnHeapAggregateProjection(
- projectionSpec.toMetadataSchema(),
- descs,
- dimensionsMap,
- parentDimIndex,
+ projectionSpec,
+ this::getDimension,
+ metric -> {
+ MetricDesc desc = getMetric(metric);
+ if (desc != null) {
+ return getMetricAggs()[desc.getIndex()];
+ }
+ return null;
+ },
incrementalIndexSchema.getMinTimestamp(),
this.useMaxMemoryEstimates,
this.maxBytesPerRowForAggregators
@@ -368,7 +318,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
final int priorIndex = facts.getPriorIndex(key);
Aggregator[] aggs;
- final AggregatorFactory[] metrics = getMetrics();
+ final AggregatorFactory[] metrics = getMetricAggs();
final AtomicInteger numEntries = getNumEntries();
if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
aggs = aggregators.get(priorIndex);
diff --git
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
index c87733d68e4..ce26f322cf4 100644
---
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
+++
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
@@ -179,6 +179,12 @@ public class Projections
return this;
}
+ @Nullable
+ public String getRemapValue(String queryColumn)
+ {
+ return remapColumns.get(queryColumn);
+ }
+
/**
* Add a projection physical column, which will later be added to {@link
ProjectionMatch#getCursorBuildSpec()} if
* the projection matches
diff --git
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index 403ae7058ba..7bc570a6ed6 100644
---
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -102,61 +102,69 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
static final DateTime TIMESTAMP =
Granularities.DAY.bucket(DateTimes.nowUtc()).getStart();
static final RowSignature ROW_SIGNATURE = RowSignature.builder()
- .add("a",
ColumnType.STRING)
- .add("b",
ColumnType.STRING)
- .add("c",
ColumnType.LONG)
- .add("d",
ColumnType.DOUBLE)
- .build();
- static final List<InputRow> ROWS = Arrays.asList(
- new ListBasedInputRow(
- ROW_SIGNATURE,
- TIMESTAMP,
- ROW_SIGNATURE.getColumnNames(),
- Arrays.asList("a", "aa", 1L, 1.0)
- ),
- new ListBasedInputRow(
- ROW_SIGNATURE,
- TIMESTAMP.plusMinutes(2),
- ROW_SIGNATURE.getColumnNames(),
- Arrays.asList("a", "bb", 1L, 1.1, 1.1f)
- ),
- new ListBasedInputRow(
- ROW_SIGNATURE,
- TIMESTAMP.plusMinutes(4),
- ROW_SIGNATURE.getColumnNames(),
- Arrays.asList("a", "cc", 2L, 2.2, 2.2f)
- ),
- new ListBasedInputRow(
- ROW_SIGNATURE,
- TIMESTAMP.plusMinutes(6),
- ROW_SIGNATURE.getColumnNames(),
- Arrays.asList("b", "aa", 3L, 3.3, 3.3f)
- ),
- new ListBasedInputRow(
- ROW_SIGNATURE,
- TIMESTAMP.plusMinutes(8),
- ROW_SIGNATURE.getColumnNames(),
- Arrays.asList("b", "aa", 4L, 4.4, 4.4f)
- ),
- new ListBasedInputRow(
- ROW_SIGNATURE,
- TIMESTAMP.plusMinutes(10),
- ROW_SIGNATURE.getColumnNames(),
- Arrays.asList("b", "bb", 5L, 5.5, 5.5f)
- ),
- new ListBasedInputRow(
- ROW_SIGNATURE,
- TIMESTAMP.plusHours(1),
- ROW_SIGNATURE.getColumnNames(),
- Arrays.asList("a", "aa", 1L, 1.1, 1.1f)
- ),
- new ListBasedInputRow(
- ROW_SIGNATURE,
- TIMESTAMP.plusHours(1).plusMinutes(1),
- ROW_SIGNATURE.getColumnNames(),
- Arrays.asList("a", "dd", 2L, 2.2, 2.2f)
- )
- );
+ .add("a",
ColumnType.STRING)
+ .add("b",
ColumnType.STRING)
+ .add("c",
ColumnType.LONG)
+ .add("d",
ColumnType.DOUBLE)
+ .add("e",
ColumnType.FLOAT)
+ .build();
+
+ public static List<InputRow> makeRows(List<String> dimensions)
+ {
+ return Arrays.asList(
+ new ListBasedInputRow(
+ ROW_SIGNATURE,
+ TIMESTAMP,
+ dimensions,
+ Arrays.asList("a", "aa", 1L, 1.0)
+ ),
+ new ListBasedInputRow(
+ ROW_SIGNATURE,
+ TIMESTAMP.plusMinutes(2),
+ dimensions,
+ Arrays.asList("a", "bb", 1L, 1.1, 1.1f)
+ ),
+ new ListBasedInputRow(
+ ROW_SIGNATURE,
+ TIMESTAMP.plusMinutes(4),
+ dimensions,
+ Arrays.asList("a", "cc", 2L, 2.2, 2.2f)
+ ),
+ new ListBasedInputRow(
+ ROW_SIGNATURE,
+ TIMESTAMP.plusMinutes(6),
+ dimensions,
+ Arrays.asList("b", "aa", 3L, 3.3, 3.3f)
+ ),
+ new ListBasedInputRow(
+ ROW_SIGNATURE,
+ TIMESTAMP.plusMinutes(8),
+ dimensions,
+ Arrays.asList("b", "aa", 4L, 4.4, 4.4f)
+ ),
+ new ListBasedInputRow(
+ ROW_SIGNATURE,
+ TIMESTAMP.plusMinutes(10),
+ dimensions,
+ Arrays.asList("b", "bb", 5L, 5.5, 5.5f)
+ ),
+ new ListBasedInputRow(
+ ROW_SIGNATURE,
+ TIMESTAMP.plusHours(1),
+ dimensions,
+ Arrays.asList("a", "aa", 1L, 1.1, 1.1f)
+ ),
+ new ListBasedInputRow(
+ ROW_SIGNATURE,
+ TIMESTAMP.plusHours(1).plusMinutes(1),
+ dimensions,
+ Arrays.asList("a", "dd", 2L, 2.2, 2.2f)
+ )
+ );
+ }
+
+ static final List<InputRow> ROWS = makeRows(ROW_SIGNATURE.getColumnNames());
+ static final List<InputRow> ROLLUP_ROWS = makeRows(ImmutableList.of("a",
"b"));
private static final List<AggregateProjectionSpec> PROJECTIONS =
Arrays.asList(
new AggregateProjectionSpec(
@@ -246,17 +254,65 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
)
);
- private static final List<AggregateProjectionSpec> AUTO_PROJECTIONS =
PROJECTIONS.stream().map(projection -> {
- return new AggregateProjectionSpec(
- projection.getName(),
- projection.getVirtualColumns(),
- projection.getGroupingColumns()
- .stream()
- .map(x -> new AutoTypeColumnSchema(x.getName(), null))
- .collect(Collectors.toList()),
- projection.getAggregators()
- );
- }).collect(Collectors.toList());
+ private static final List<AggregateProjectionSpec> ROLLUP_PROJECTIONS =
Arrays.asList(
+ new AggregateProjectionSpec(
+ "a_hourly_c_sum_with_count",
+ VirtualColumns.create(
+ Granularities.toVirtualColumn(Granularities.HOUR, "__gran")
+ ),
+ Arrays.asList(
+ new LongDimensionSchema("__gran"),
+ new StringDimensionSchema("a")
+ ),
+ new AggregatorFactory[]{
+ new CountAggregatorFactory("chocula"),
+ new LongSumAggregatorFactory("sum_c", "sum_c")
+ }
+ ),
+ new AggregateProjectionSpec(
+ "afoo_daily",
+ VirtualColumns.create(
+ new ExpressionVirtualColumn(
+ "afoo",
+ "concat(a, 'foo')",
+ ColumnType.STRING,
+ TestExprMacroTable.INSTANCE
+ )
+ ),
+ List.of(
+ new StringDimensionSchema("afoo")
+ ),
+ new AggregatorFactory[]{
+ new LongSumAggregatorFactory("sum_c", "sum_c")
+ }
+ )
+ );
+
+ private static final List<AggregateProjectionSpec> AUTO_PROJECTIONS =
+ PROJECTIONS.stream()
+ .map(projection -> new AggregateProjectionSpec(
+ projection.getName(),
+ projection.getVirtualColumns(),
+ projection.getGroupingColumns()
+ .stream()
+ .map(x -> new AutoTypeColumnSchema(x.getName(),
null))
+ .collect(Collectors.toList()),
+ projection.getAggregators()
+ ))
+ .collect(Collectors.toList());
+
+ private static final List<AggregateProjectionSpec> AUTO_ROLLUP_PROJECTIONS =
+ ROLLUP_PROJECTIONS.stream()
+ .map(projection -> new AggregateProjectionSpec(
+ projection.getName(),
+ projection.getVirtualColumns(),
+ projection.getGroupingColumns()
+ .stream()
+ .map(x -> new
AutoTypeColumnSchema(x.getName(), null))
+ .collect(Collectors.toList()),
+ projection.getAggregators()
+ ))
+ .collect(Collectors.toList());
@Parameterized.Parameters(name = "name: {0}, sortByDim: {3}, autoSchema:
{4}")
public static Collection<?> constructorFeeder()
@@ -273,46 +329,85 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
new FloatDimensionSchema("e")
)
);
+
+ final DimensionsSpec.Builder rollupDimensionsBuilder =
+ DimensionsSpec.builder()
+ .setDimensions(
+ Arrays.asList(
+ new StringDimensionSchema("a"),
+ new StringDimensionSchema("b")
+ )
+ );
+ final AggregatorFactory[] rollupAggs = new AggregatorFactory[]{
+ new LongSumAggregatorFactory("sum_c", "c"),
+ new DoubleSumAggregatorFactory("sum_d", "d"),
+ new FloatSumAggregatorFactory("sum_e", "e")
+ };
+
DimensionsSpec dimsTimeOrdered = dimensionsBuilder.build();
DimensionsSpec dimsOrdered =
dimensionsBuilder.setForceSegmentSortByTime(false).build();
+ DimensionsSpec rollupDimsTimeOrdered = rollupDimensionsBuilder.build();
+ DimensionsSpec rollupDimsOrdered =
rollupDimensionsBuilder.setForceSegmentSortByTime(false).build();
+
List<DimensionSchema> autoDims = dimsOrdered.getDimensions()
.stream()
.map(x -> new
AutoTypeColumnSchema(x.getName(), null))
.collect(Collectors.toList());
+
+ List<DimensionSchema> rollupAutoDims = rollupDimsOrdered.getDimensions()
+ .stream()
+ .map(x -> new
AutoTypeColumnSchema(x.getName(), null))
+
.collect(Collectors.toList());
+
for (boolean incremental : new boolean[]{true, false}) {
- for (boolean sortByDim : new boolean[]{true, false}) {
- for (boolean autoSchema : new boolean[]{true, false}) {
+ for (boolean sortByDim : new boolean[]{/*true,*/ false}) {
+ for (boolean autoSchema : new boolean[]{/*true,*/ false}) {
final DimensionsSpec dims;
+ final DimensionsSpec rollupDims;
if (sortByDim) {
if (autoSchema) {
dims = dimsOrdered.withDimensions(autoDims);
+ rollupDims = rollupDimsOrdered.withDimensions(rollupAutoDims);
} else {
dims = dimsOrdered;
+ rollupDims = rollupDimsOrdered;
}
} else {
if (autoSchema) {
dims = dimsTimeOrdered.withDimensions(autoDims);
+ rollupDims = rollupDimsTimeOrdered.withDimensions(autoDims);
} else {
dims = dimsTimeOrdered;
+ rollupDims = rollupDimsTimeOrdered;
}
}
if (incremental) {
IncrementalIndex index = CLOSER.register(makeBuilder(dims,
autoSchema).buildIncrementalIndex());
+ IncrementalIndex rollupIndex = CLOSER.register(
+ makeRollupBuilder(rollupDims, rollupAggs,
autoSchema).buildIncrementalIndex()
+ );
constructors.add(new Object[]{
"incrementalIndex",
new IncrementalIndexCursorFactory(index),
new IncrementalIndexTimeBoundaryInspector(index),
+ new IncrementalIndexCursorFactory(rollupIndex),
+ new IncrementalIndexTimeBoundaryInspector(rollupIndex),
sortByDim,
autoSchema
});
} else {
QueryableIndex index = CLOSER.register(makeBuilder(dims,
autoSchema).buildMMappedIndex());
+ QueryableIndex rollupIndex = CLOSER.register(
+ makeRollupBuilder(rollupDims, rollupAggs,
autoSchema).buildMMappedIndex()
+ );
constructors.add(new Object[]{
"queryableIndex",
new QueryableIndexCursorFactory(index),
QueryableIndexTimeBoundaryInspector.create(index),
+ new QueryableIndexCursorFactory(rollupIndex),
+ QueryableIndexTimeBoundaryInspector.create(rollupIndex),
sortByDim,
autoSchema
});
@@ -332,6 +427,8 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
public final CursorFactory projectionsCursorFactory;
public final TimeBoundaryInspector projectionsTimeBoundaryInspector;
+ public final CursorFactory rollupProjectionsCursorFactory;
+ public final TimeBoundaryInspector rollupProjectionsTimeBoundaryInspector;
private final GroupingEngine groupingEngine;
private final TimeseriesQueryEngine timeseriesEngine;
@@ -347,12 +444,16 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
String name,
CursorFactory projectionsCursorFactory,
TimeBoundaryInspector projectionsTimeBoundaryInspector,
+ CursorFactory rollupProjectionsCursorFactory,
+ TimeBoundaryInspector rollupProjectionsTimeBoundaryInspector,
boolean sortByDim,
boolean autoSchema
)
{
this.projectionsCursorFactory = projectionsCursorFactory;
this.projectionsTimeBoundaryInspector = projectionsTimeBoundaryInspector;
+ this.rollupProjectionsCursorFactory = rollupProjectionsCursorFactory;
+ this.rollupProjectionsTimeBoundaryInspector =
rollupProjectionsTimeBoundaryInspector;
this.sortByDim = sortByDim;
this.autoSchema = autoSchema;
this.nonBlockingPool = closer.closeLater(
@@ -955,7 +1056,7 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
@Test
public void testProjectionSelectionMissingAggregatorButHasAggregatorInput()
{
- // d is present as a column on the projection, but since its an aggregate
projection we cannot use it
+ // e is present as a column on the projection, but since its an aggregate
projection we cannot use it
final GroupByQuery query =
GroupByQuery.builder()
.setDataSource("test")
@@ -986,10 +1087,10 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
final List<ResultRow> results = resultRows.toList();
Assert.assertEquals(4, results.size());
- Assert.assertArrayEquals(new Object[]{"aa", 9L, null},
results.get(0).getArray());
- Assert.assertArrayEquals(new Object[]{"bb", 6L, null},
results.get(1).getArray());
- Assert.assertArrayEquals(new Object[]{"cc", 2L, null},
results.get(2).getArray());
- Assert.assertArrayEquals(new Object[]{"dd", 2L, null},
results.get(3).getArray());
+ Assert.assertArrayEquals(new Object[]{"aa", 9L, 8.8f},
results.get(0).getArray());
+ Assert.assertArrayEquals(new Object[]{"bb", 6L, 6.6f},
results.get(1).getArray());
+ Assert.assertArrayEquals(new Object[]{"cc", 2L, 2.2f},
results.get(2).getArray());
+ Assert.assertArrayEquals(new Object[]{"dd", 2L, 2.2f},
results.get(3).getArray());
}
@Test
@@ -1217,6 +1318,88 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
Assert.assertArrayEquals(new
Object[]{TIMESTAMP.plusHours(1).plusMinutes(1), 2L},
getResultArray(results.get(7), querySignature));
}
+ @Test
+ public void testProjectionSingleDimRollupTable()
+ {
+ // test can use the single dimension projection
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .addDimension("a")
+ .addAggregator(new LongSumAggregatorFactory("c_sum",
"sum_c"))
+ .build();
+ final CursorBuildSpec buildSpec =
GroupingEngine.makeCursorBuildSpec(query, null);
+ try (final CursorHolder cursorHolder =
rollupProjectionsCursorFactory.makeCursorHolder(buildSpec)) {
+ final Cursor cursor = cursorHolder.asCursor();
+ int rowCount = 0;
+ while (!cursor.isDone()) {
+ rowCount++;
+ cursor.advance();
+ }
+ Assert.assertEquals(3, rowCount);
+ }
+ final Sequence<ResultRow> resultRows = groupingEngine.process(
+ query,
+ rollupProjectionsCursorFactory,
+ rollupProjectionsTimeBoundaryInspector,
+ nonBlockingPool,
+ null
+ );
+ final List<ResultRow> results = resultRows.toList();
+ Assert.assertEquals(2, results.size());
+ Assert.assertArrayEquals(
+ new Object[]{"a", 7L},
+ results.get(0).getArray()
+ );
+ Assert.assertArrayEquals(
+ new Object[]{"b", 12L},
+ results.get(1).getArray()
+ );
+ }
+
+ @Test
+ public void testProjectionSingleDimVirtualColumnRollupTable()
+ {
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .addDimension("v0")
+ .setVirtualColumns(new ExpressionVirtualColumn("v0",
"concat(a, 'foo')", ColumnType.STRING, TestExprMacroTable.INSTANCE))
+ .addAggregator(new LongSumAggregatorFactory("c_sum",
"sum_c"))
+ .build();
+ final CursorBuildSpec buildSpec =
GroupingEngine.makeCursorBuildSpec(query, null);
+ try (final CursorHolder cursorHolder =
rollupProjectionsCursorFactory.makeCursorHolder(buildSpec)) {
+ final Cursor cursor = cursorHolder.asCursor();
+ int rowCount = 0;
+ while (!cursor.isDone()) {
+ rowCount++;
+ cursor.advance();
+ }
+ Assert.assertEquals(2, rowCount);
+ }
+ final Sequence<ResultRow> resultRows = groupingEngine.process(
+ query,
+ rollupProjectionsCursorFactory,
+ rollupProjectionsTimeBoundaryInspector,
+ nonBlockingPool,
+ null
+ );
+ final List<ResultRow> results = resultRows.toList();
+ Assert.assertEquals(2, results.size());
+ Assert.assertArrayEquals(
+ new Object[]{"afoo", 7L},
+ results.get(0).getArray()
+ );
+ Assert.assertArrayEquals(
+ new Object[]{"bfoo", 12L},
+ results.get(1).getArray()
+ );
+ }
+
private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec,
boolean autoSchema)
{
File tmp = FileUtils.createTempDir();
@@ -1234,6 +1417,24 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
.rows(ROWS);
}
+ private static IndexBuilder makeRollupBuilder(DimensionsSpec dimensionsSpec,
AggregatorFactory[] aggs, boolean autoSchema)
+ {
+ File tmp = FileUtils.createTempDir();
+ CLOSER.register(tmp::delete);
+ return IndexBuilder.create()
+ .tmpDir(tmp)
+ .schema(
+ IncrementalIndexSchema.builder()
+
.withDimensionsSpec(dimensionsSpec)
+ .withMetrics(aggs)
+ .withRollup(true)
+
.withMinTimestamp(TIMESTAMP.getMillis())
+ .withProjections(autoSchema ?
AUTO_ROLLUP_PROJECTIONS : ROLLUP_PROJECTIONS)
+ .build()
+ )
+ .rows(ROLLUP_ROWS);
+ }
+
private static Set<Object[]> makeArrayResultSet()
{
Set<Object[]> resultsInNoParticularOrder = new ObjectOpenCustomHashSet<>(
diff --git
a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
index 4b76fa8dd2b..8b27b52b030 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
@@ -3005,7 +3005,8 @@ public abstract class IndexMergerTestBase extends
InitializedNullHandlingTest
.setDimensions(
Arrays.asList(
new StringDimensionSchema("a"),
- new StringDimensionSchema("b")
+ new StringDimensionSchema("b"),
+ new LongDimensionSchema("c")
)
);
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
index 07ed6f7502f..ff01b8f989c 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
@@ -21,8 +21,22 @@ package org.apache.druid.segment.incremental;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.junit.Assert;
import org.junit.Test;
@@ -31,11 +45,12 @@ public class OnheapIncrementalIndexTest
private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
@Test
- public void testSerde() throws JsonProcessingException
+ public void testSpecSerde() throws JsonProcessingException
{
OnheapIncrementalIndex.Spec spec = new OnheapIncrementalIndex.Spec(true);
Assert.assertEquals(spec,
MAPPER.readValue(MAPPER.writeValueAsString(spec),
OnheapIncrementalIndex.Spec.class));
}
+
@Test
public void testSpecEqualsAndHashCode()
{
@@ -43,4 +58,233 @@ public class OnheapIncrementalIndexTest
.usingGetClass()
.verify();
}
+
+ @Test
+ public void testBadProjectionMismatchedDimensionTypes()
+ {
+ Throwable t = Assert.assertThrows(
+ DruidException.class,
+ () ->
+ IndexBuilder.create()
+ .schema(
+ IncrementalIndexSchema.builder()
+ .withDimensionsSpec(
+ DimensionsSpec.builder()
+
.setDimensions(
+
ImmutableList.of(
+
new StringDimensionSchema("string"),
+
new LongDimensionSchema("long")
+ )
+ )
+ .build()
+ )
+ .withProjections(
+ ImmutableList.of(
+ new
AggregateProjectionSpec(
+ "mismatched
dims",
+
VirtualColumns.EMPTY,
+ ImmutableList.of(
+ new
LongDimensionSchema("string")
+ ),
+ null
+ )
+ )
+ )
+ .build()
+ ).buildIncrementalIndex()
+ );
+ Assert.assertEquals(
+ "projection[mismatched dims] contains dimension[string] with different
type[LONG] than type[STRING] in base table",
+ t.getMessage()
+ );
+ }
+
+ @Test
+ public void testBadProjectionVirtualColumnNoDimension()
+ {
+ Throwable t = Assert.assertThrows(
+ DruidException.class,
+ () ->
+ IndexBuilder.create()
+ .schema(
+ IncrementalIndexSchema.builder()
+ .withDimensionsSpec(
+ DimensionsSpec.builder()
+
.setDimensions(
+
ImmutableList.of(
+
new StringDimensionSchema("string"),
+
new LongDimensionSchema("long")
+ )
+ )
+ .build()
+ )
+ .withProjections(
+ ImmutableList.of(
+ new
AggregateProjectionSpec(
+ "sad virtual
column",
+
VirtualColumns.create(
+ new
ExpressionVirtualColumn(
+ "v0",
+ "double",
+
ColumnType.DOUBLE,
+
TestExprMacroTable.INSTANCE
+ )
+ ),
+ ImmutableList.of(
+ new
LongDimensionSchema("long")
+ ),
+ null
+ )
+ )
+ )
+ .build()
+ ).buildIncrementalIndex()
+ );
+ Assert.assertEquals(
+ "projection[sad virtual column] contains virtual column[v0] that
references an input[double] which is not a dimension in the base table",
+ t.getMessage()
+ );
+ }
+
+ @Test
+ public void testBadProjectionRollupMismatchedAggType()
+ {
+ Throwable t = Assert.assertThrows(
+ DruidException.class,
+ () ->
+ IndexBuilder.create()
+ .schema(
+ IncrementalIndexSchema.builder()
+ .withDimensionsSpec(
+ DimensionsSpec.builder()
+
.setDimensions(
+
ImmutableList.of(
+
new StringDimensionSchema("string"),
+
new LongDimensionSchema("long")
+ )
+ )
+ .build()
+ )
+ .withRollup(true)
+ .withMetrics(
+ new
DoubleSumAggregatorFactory("sum_double", "sum_double")
+ )
+ .withProjections(
+ ImmutableList.of(
+ new
AggregateProjectionSpec(
+ "mismatched agg",
+
VirtualColumns.EMPTY,
+ ImmutableList.of(
+ new
StringDimensionSchema("string")
+ ),
+ new
AggregatorFactory[] {
+ new
LongSumAggregatorFactory("sum_double", "sum_double")
+ }
+ )
+ )
+ )
+ .build()
+ ).buildIncrementalIndex()
+ );
+ Assert.assertEquals(
+ "projection[mismatched agg] contains aggregator[sum_double] that is
not the 'combining' aggregator of base table aggregator[sum_double]",
+ t.getMessage()
+ );
+ }
+
+ @Test
+ public void testBadProjectionRollupBadAggInput()
+ {
+ Throwable t = Assert.assertThrows(
+ DruidException.class,
+ () ->
+ IndexBuilder.create()
+ .schema(
+ IncrementalIndexSchema.builder()
+ .withDimensionsSpec(
+ DimensionsSpec.builder()
+
.setDimensions(
+
ImmutableList.of(
+
new StringDimensionSchema("string"),
+
new LongDimensionSchema("long")
+ )
+ )
+ .build()
+ )
+ .withRollup(true)
+ .withMetrics(
+ new
DoubleSumAggregatorFactory("double", "double")
+ )
+ .withProjections(
+ ImmutableList.of(
+ new
AggregateProjectionSpec(
+ "renamed agg",
+
VirtualColumns.EMPTY,
+ ImmutableList.of(
+ new
StringDimensionSchema("string")
+ ),
+ new
AggregatorFactory[] {
+ new
LongSumAggregatorFactory("sum_long", "long"),
+ new
DoubleSumAggregatorFactory("sum_double", "double")
+ }
+ )
+ )
+ )
+ .build()
+ ).buildIncrementalIndex()
+ );
+ Assert.assertEquals(
+ "projection[renamed agg] contains aggregator[sum_double] that
references aggregator[double] in base table but this is not supported,
projection aggregators which reference base table aggregates must be
'combining' aggregators with the same name as the base table column",
+ t.getMessage()
+ );
+ }
+
+ @Test
+ public void testBadProjectionVirtualColumnAggInput()
+ {
+ Throwable t = Assert.assertThrows(
+ DruidException.class,
+ () ->
+ IndexBuilder.create()
+ .schema(
+ IncrementalIndexSchema.builder()
+ .withDimensionsSpec(
+ DimensionsSpec.builder()
+
.setDimensions(
+
ImmutableList.of(
+
new StringDimensionSchema("string"),
+
new LongDimensionSchema("long")
+ )
+ )
+ .build()
+ )
+ .withProjections(
+ ImmutableList.of(
+ new
AggregateProjectionSpec(
+ "sad agg virtual
column",
+
VirtualColumns.create(
+ new
ExpressionVirtualColumn(
+ "v0",
+ "long +
100",
+
ColumnType.LONG,
+
TestExprMacroTable.INSTANCE
+ )
+ ),
+ ImmutableList.of(
+ new
LongDimensionSchema("long")
+ ),
+ new
AggregatorFactory[] {
+ new
LongSumAggregatorFactory("v0_sum", "v0")
+ }
+ )
+ )
+ )
+ .build()
+ ).buildIncrementalIndex()
+ );
+ Assert.assertEquals(
+ "projection[sad agg virtual column] contains aggregator[v0_sum] that
is has required field[v0] which is a virtual column, this is not yet supported",
+ t.getMessage()
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]