This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5e76b100b9 Support for DistinctSumMV and DistinctAvgMV aggregation
functions (#10128)
5e76b100b9 is described below
commit 5e76b100b924340b1f401994f3cd84c876377291
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Wed Jan 18 17:09:25 2023 -0800
Support for DistinctSumMV and DistinctAvgMV aggregation functions (#10128)
* Add support for DistinctSumMV and DistinctAverageMV aggregation functions
* Empty commit to retrigger tests
* Address review comments
---
.../query/NonScanBasedAggregationOperator.java | 10 +-
.../pinot/core/plan/AggregationPlanNode.java | 2 +-
.../function/AggregationFunctionFactory.java | 4 +
.../BaseDistinctAggregateAggregationFunction.java | 370 +++++++++++++++++----
.../function/DistinctAvgAggregationFunction.java | 25 +-
....java => DistinctAvgMVAggregationFunction.java} | 31 +-
.../function/DistinctCountAggregationFunction.java | 25 +-
.../DistinctCountMVAggregationFunction.java | 244 +-------------
.../function/DistinctSumAggregationFunction.java | 25 +-
....java => DistinctSumMVAggregationFunction.java} | 31 +-
.../pinot/queries/ExplainPlanQueriesTest.java | 126 ++++---
...terSegmentAggregationMultiValueQueriesTest.java | 84 ++++-
.../pinot/segment/spi/AggregationFunctionType.java | 2 +
13 files changed, 625 insertions(+), 354 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
index e036e17a8d..bf7e4e63c2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
@@ -100,7 +100,11 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
result = new MinMaxRangePair(getMinValue(dataSource),
getMaxValue(dataSource));
break;
case DISTINCTCOUNT:
+ case DISTINCTSUM:
+ case DISTINCTAVG:
case DISTINCTCOUNTMV:
+ case DISTINCTSUMMV:
+ case DISTINCTAVGMV:
result =
getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary()));
break;
case DISTINCTCOUNTHLL:
@@ -113,12 +117,6 @@ public class NonScanBasedAggregationOperator extends
BaseOperator<AggregationRes
result =
getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
((DistinctCountRawHLLAggregationFunction)
aggregationFunction).getDistinctCountHLLAggregationFunction());
break;
- case DISTINCTSUM:
- result =
getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary()));
- break;
- case DISTINCTAVG:
- result =
getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary()));
- break;
case SEGMENTPARTITIONEDDISTINCTCOUNT:
result = (long)
Objects.requireNonNull(dataSource.getDictionary()).length();
break;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 8ab98970f4..74e5951412 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -56,7 +56,7 @@ public class AggregationPlanNode implements PlanNode {
private static final EnumSet<AggregationFunctionType>
DICTIONARY_BASED_FUNCTIONS =
EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV,
DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTCOUNTHLL,
DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV,
SEGMENTPARTITIONEDDISTINCTCOUNT,
- DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG);
+ DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV,
DISTINCTAVGMV);
// DISTINCTCOUNT excluded because consuming segment metadata contains
unknown cardinality when there is no dictionary
private static final EnumSet<AggregationFunctionType>
METADATA_BASED_FUNCTIONS =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 157d1b64da..9b571ff3c4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -266,6 +266,10 @@ public class AggregationFunctionFactory {
return new DistinctCountHLLMVAggregationFunction(arguments);
case DISTINCTCOUNTRAWHLLMV:
return new DistinctCountRawHLLMVAggregationFunction(arguments);
+ case DISTINCTSUMMV:
+ return new DistinctSumMVAggregationFunction(firstArgument);
+ case DISTINCTAVGMV:
+ return new DistinctAvgMVAggregationFunction(firstArgument);
case DISTINCT:
return new DistinctAggregationFunction(arguments,
queryContext.getOrderByExpressions(),
queryContext.getLimit());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java
index 0f9f09a392..0dc2ed35e5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java
@@ -71,7 +71,73 @@ public abstract class
BaseDistinctAggregateAggregationFunction<T extends Compara
}
@Override
- public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ public Set extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ Object result = aggregationResultHolder.getResult();
+ if (result == null) {
+ // Use empty IntOpenHashSet as a place holder for empty result
+ return new IntOpenHashSet();
+ }
+
+ if (result instanceof DictIdsWrapper) {
+ // For dictionary-encoded expression, convert dictionary ids to values
+ return convertToValueSet((DictIdsWrapper) result);
+ } else {
+ // For non-dictionary-encoded expression, directly return the value set
+ return (Set) result;
+ }
+ }
+
+ @Override
+ public Set extractGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey) {
+ Object result = groupByResultHolder.getResult(groupKey);
+ if (result == null) {
+ // NOTE: Return an empty IntOpenHashSet for empty result.
+ return new IntOpenHashSet();
+ }
+
+ if (result instanceof DictIdsWrapper) {
+ // For dictionary-encoded expression, convert dictionary ids to values
+ return convertToValueSet((DictIdsWrapper) result);
+ } else {
+ // For non-dictionary-encoded expression, directly return the value set
+ return (Set) result;
+ }
+ }
+
+ @Override
+ public Set merge(Set intermediateResult1, Set intermediateResult2) {
+ if (intermediateResult1.isEmpty()) {
+ return intermediateResult2;
+ }
+ if (intermediateResult2.isEmpty()) {
+ return intermediateResult1;
+ }
+ intermediateResult1.addAll(intermediateResult2);
+ return intermediateResult1;
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.OBJECT;
+ }
+
+ /**
+ * Returns the dictionary id bitmap from the result holder or creates a new
one if it does not exist.
+ */
+ protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder
aggregationResultHolder,
+ Dictionary dictionary) {
+ DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
+ if (dictIdsWrapper == null) {
+ dictIdsWrapper = new DictIdsWrapper(dictionary);
+ aggregationResultHolder.setValue(dictIdsWrapper);
+ }
+ return dictIdsWrapper._dictIdBitmap;
+ }
+
+ /**
+ * Performs aggregation for a SV column
+ */
+ protected void svAggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
@@ -136,8 +202,85 @@ public abstract class
BaseDistinctAggregateAggregationFunction<T extends Compara
}
}
- @Override
- public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ /**
+ * Performs aggregation for a MV column
+ */
+ protected void mvAggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder,
dictionary);
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ dictIdBitmap.add(dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the value set
+ DataType storedType = blockValSet.getValueType().getStoredType();
+ Set valueSet = getValueSet(aggregationResultHolder, storedType);
+ switch (storedType) {
+ case INT:
+ IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
+ int[][] intValues = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int value : intValues[i]) {
+ intSet.add(value);
+ }
+ }
+ break;
+ case LONG:
+ LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
+ long[][] longValues = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (long value : longValues[i]) {
+ longSet.add(value);
+ }
+ }
+ break;
+ case FLOAT:
+ FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
+ float[][] floatValues = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (float value : floatValues[i]) {
+ floatSet.add(value);
+ }
+ }
+ break;
+ case DOUBLE:
+ DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
+ double[][] doubleValues = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (double value : doubleValues[i]) {
+ doubleSet.add(value);
+ }
+ }
+ break;
+ case STRING:
+ ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>)
valueSet;
+ String[][] stringValues = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ //noinspection ManualArrayToCollectionCopy
+ for (String value : stringValues[i]) {
+ //noinspection UseBulkOperation
+ stringSet.add(value);
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for " + _functionType.getName() + " aggregation
function: " + storedType);
+ }
+ }
+
+ /**
+ * Performs aggregation for a SV column with group by on a SV column.
+ */
+ protected void svAggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
@@ -199,8 +342,86 @@ public abstract class
BaseDistinctAggregateAggregationFunction<T extends Compara
}
}
- @Override
- public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ /**
+ * Performs aggregation for a MV column with group by on a SV column.
+ */
+ protected void mvAggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ getDictIdBitmap(groupByResultHolder, groupKeyArray[i],
dictionary).add(dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store values into the value set
+ DataType storedType = blockValSet.getValueType().getStoredType();
+ switch (storedType) {
+ case INT:
+ int[][] intValues = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ IntOpenHashSet intSet = (IntOpenHashSet)
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.INT);
+ for (int value : intValues[i]) {
+ intSet.add(value);
+ }
+ }
+ break;
+ case LONG:
+ long[][] longValues = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ LongOpenHashSet longSet = (LongOpenHashSet)
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.LONG);
+ for (long value : longValues[i]) {
+ longSet.add(value);
+ }
+ }
+ break;
+ case FLOAT:
+ float[][] floatValues = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ FloatOpenHashSet floatSet =
+ (FloatOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.FLOAT);
+ for (float value : floatValues[i]) {
+ floatSet.add(value);
+ }
+ }
+ break;
+ case DOUBLE:
+ double[][] doubleValues = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ DoubleOpenHashSet doubleSet =
+ (DoubleOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.DOUBLE);
+ for (double value : doubleValues[i]) {
+ doubleSet.add(value);
+ }
+ }
+ break;
+ case STRING:
+ String[][] stringValues = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ ObjectOpenHashSet<String> stringSet =
+ (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.STRING);
+ //noinspection ManualArrayToCollectionCopy
+ for (String value : stringValues[i]) {
+ //noinspection UseBulkOperation
+ stringSet.add(value);
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for " + _functionType.getName() + " aggregation
function: " + storedType);
+ }
+ }
+
+ /**
+ * Performs aggregation for a SV column with group by on a MV column.
+ */
+ protected void svAggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
@@ -259,68 +480,91 @@ public abstract class
BaseDistinctAggregateAggregationFunction<T extends Compara
}
}
- @Override
- public Set extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
- Object result = aggregationResultHolder.getResult();
- if (result == null) {
- // Use empty IntOpenHashSet as a place holder for empty result
- return new IntOpenHashSet();
- }
-
- if (result instanceof DictIdsWrapper) {
- // For dictionary-encoded expression, convert dictionary ids to values
- return convertToValueSet((DictIdsWrapper) result);
- } else {
- // For non-dictionary-encoded expression, directly return the value set
- return (Set) result;
- }
- }
-
- @Override
- public Set extractGroupByResult(GroupByResultHolder groupByResultHolder, int
groupKey) {
- Object result = groupByResultHolder.getResult(groupKey);
- if (result == null) {
- // NOTE: Return an empty IntOpenHashSet for empty result.
- return new IntOpenHashSet();
- }
-
- if (result instanceof DictIdsWrapper) {
- // For dictionary-encoded expression, convert dictionary ids to values
- return convertToValueSet((DictIdsWrapper) result);
- } else {
- // For non-dictionary-encoded expression, directly return the value set
- return (Set) result;
- }
- }
+ /**
+ * Performs aggregation for a MV column with group by on a MV column.
+ */
+ protected void mvAggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
- @Override
- public Set merge(Set intermediateResult1, Set intermediateResult2) {
- if (intermediateResult1.isEmpty()) {
- return intermediateResult2;
- }
- if (intermediateResult2.isEmpty()) {
- return intermediateResult1;
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ getDictIdBitmap(groupByResultHolder, groupKey,
dictionary).add(dictIds[i]);
+ }
+ }
+ return;
}
- intermediateResult1.addAll(intermediateResult2);
- return intermediateResult1;
- }
- @Override
- public ColumnDataType getIntermediateResultColumnType() {
- return ColumnDataType.OBJECT;
- }
-
- /**
- * Returns the dictionary id bitmap from the result holder or creates a new
one if it does not exist.
- */
- protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder
aggregationResultHolder,
- Dictionary dictionary) {
- DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
- if (dictIdsWrapper == null) {
- dictIdsWrapper = new DictIdsWrapper(dictionary);
- aggregationResultHolder.setValue(dictIdsWrapper);
+ // For non-dictionary-encoded expression, store hash code of the values
into the value set
+ DataType storedType = blockValSet.getValueType().getStoredType();
+ switch (storedType) {
+ case INT:
+ int[][] intValues = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ IntOpenHashSet intSet = (IntOpenHashSet)
getValueSet(groupByResultHolder, groupKey, DataType.INT);
+ for (int value : intValues[i]) {
+ intSet.add(value);
+ }
+ }
+ }
+ break;
+ case LONG:
+ long[][] longValues = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ LongOpenHashSet longSet = (LongOpenHashSet)
getValueSet(groupByResultHolder, groupKey, DataType.LONG);
+ for (long value : longValues[i]) {
+ longSet.add(value);
+ }
+ }
+ }
+ break;
+ case FLOAT:
+ float[][] floatValues = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ FloatOpenHashSet floatSet = (FloatOpenHashSet)
getValueSet(groupByResultHolder, groupKey, DataType.FLOAT);
+ for (float value : floatValues[i]) {
+ floatSet.add(value);
+ }
+ }
+ }
+ break;
+ case DOUBLE:
+ double[][] doubleValues = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ DoubleOpenHashSet doubleSet =
+ (DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKey,
DataType.DOUBLE);
+ for (double value : doubleValues[i]) {
+ doubleSet.add(value);
+ }
+ }
+ }
+ break;
+ case STRING:
+ String[][] stringValues = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ ObjectOpenHashSet<String> stringSet =
+ (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder,
groupKey, DataType.STRING);
+ //noinspection ManualArrayToCollectionCopy
+ for (String value : stringValues[i]) {
+ //noinspection UseBulkOperation
+ stringSet.add(value);
+ }
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for " + _functionType.getName() + " aggregation
function: " + storedType);
}
- return dictIdsWrapper._dictIdBitmap;
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
index 41b8550963..bca958957f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
@@ -18,13 +18,18 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
+
/**
- * Aggregation function to compute the average of distinct values.
+ * Aggregation function to compute the average of distinct values for an SV
column.
*/
public class DistinctAvgAggregationFunction extends
BaseDistinctAggregateAggregationFunction<Double> {
@@ -32,6 +37,24 @@ public class DistinctAvgAggregationFunction extends
BaseDistinctAggregateAggrega
super(expression, AggregationFunctionType.DISTINCTAVG);
}
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ svAggregate(length, aggregationResultHolder, blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ svAggregateGroupBySV(length, groupKeyArray, groupByResultHolder,
blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ svAggregateGroupByMV(length, groupKeysArray, groupByResultHolder,
blockValSetMap);
+ }
+
@Override
public DataSchema.ColumnDataType getFinalResultColumnType() {
return DataSchema.ColumnDataType.DOUBLE;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgMVAggregationFunction.java
similarity index 55%
copy from
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgMVAggregationFunction.java
index 41b8550963..30a8e00492 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgMVAggregationFunction.java
@@ -18,18 +18,41 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
+
/**
- * Aggregation function to compute the average of distinct values.
+ * Aggregation function to compute the average of distinct values for an MV
column.
*/
-public class DistinctAvgAggregationFunction extends
BaseDistinctAggregateAggregationFunction<Double> {
+public class DistinctAvgMVAggregationFunction extends
BaseDistinctAggregateAggregationFunction<Double> {
+
+ public DistinctAvgMVAggregationFunction(ExpressionContext expression) {
+ super(expression, AggregationFunctionType.DISTINCTAVGMV);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ mvAggregate(length, aggregationResultHolder, blockValSetMap);
+ }
- public DistinctAvgAggregationFunction(ExpressionContext expression) {
- super(expression, AggregationFunctionType.DISTINCTAVG);
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ mvAggregateGroupBySV(length, groupKeyArray, groupByResultHolder,
blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ mvAggregateGroupByMV(length, groupKeysArray, groupByResultHolder,
blockValSetMap);
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
index 899a2d6ab5..aec983b8df 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
@@ -18,13 +18,18 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
+
/**
- * Aggregation function to compute the average of distinct values.
+ * Aggregation function to compute the average of distinct values for an SV
column
*/
public class DistinctCountAggregationFunction extends
BaseDistinctAggregateAggregationFunction<Integer> {
@@ -32,6 +37,24 @@ public class DistinctCountAggregationFunction extends
BaseDistinctAggregateAggre
super(expression, AggregationFunctionType.DISTINCTCOUNT);
}
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ svAggregate(length, aggregationResultHolder, blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ svAggregateGroupBySV(length, groupKeyArray, groupByResultHolder,
blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ svAggregateGroupByMV(length, groupKeysArray, groupByResultHolder,
blockValSetMap);
+ }
+
@Override
public ColumnDataType getFinalResultColumnType() {
return ColumnDataType.INT;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
index 6dc65461a9..d8d257b400 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
@@ -18,261 +18,51 @@
*/
package org.apache.pinot.core.query.aggregation.function;
-import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
-import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
-import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class DistinctCountMVAggregationFunction extends
DistinctCountAggregationFunction {
+/**
+ * Aggregation function to compute the average of distinct values for an MV
column
+ */
+public class DistinctCountMVAggregationFunction extends
BaseDistinctAggregateAggregationFunction<Integer> {
public DistinctCountMVAggregationFunction(ExpressionContext expression) {
- super(expression);
+ super(expression, AggregationFunctionType.DISTINCTCOUNTMV);
}
- @Override
- public AggregationFunctionType getType() {
- return AggregationFunctionType.DISTINCTCOUNTMV;
- }
@Override
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
- BlockValSet blockValSet = blockValSetMap.get(_expression);
-
- // For dictionary-encoded expression, store dictionary ids into the bitmap
- Dictionary dictionary = blockValSet.getDictionary();
- if (dictionary != null) {
- RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder,
dictionary);
- int[][] dictIds = blockValSet.getDictionaryIdsMV();
- for (int i = 0; i < length; i++) {
- dictIdBitmap.add(dictIds[i]);
- }
- return;
- }
-
- // For non-dictionary-encoded expression, store values into the value set
- DataType storedType = blockValSet.getValueType().getStoredType();
- Set valueSet = getValueSet(aggregationResultHolder, storedType);
- switch (storedType) {
- case INT:
- IntOpenHashSet intSet = (IntOpenHashSet) valueSet;
- int[][] intValues = blockValSet.getIntValuesMV();
- for (int i = 0; i < length; i++) {
- for (int value : intValues[i]) {
- intSet.add(value);
- }
- }
- break;
- case LONG:
- LongOpenHashSet longSet = (LongOpenHashSet) valueSet;
- long[][] longValues = blockValSet.getLongValuesMV();
- for (int i = 0; i < length; i++) {
- for (long value : longValues[i]) {
- longSet.add(value);
- }
- }
- break;
- case FLOAT:
- FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet;
- float[][] floatValues = blockValSet.getFloatValuesMV();
- for (int i = 0; i < length; i++) {
- for (float value : floatValues[i]) {
- floatSet.add(value);
- }
- }
- break;
- case DOUBLE:
- DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet;
- double[][] doubleValues = blockValSet.getDoubleValuesMV();
- for (int i = 0; i < length; i++) {
- for (double value : doubleValues[i]) {
- doubleSet.add(value);
- }
- }
- break;
- case STRING:
- ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>)
valueSet;
- String[][] stringValues = blockValSet.getStringValuesMV();
- for (int i = 0; i < length; i++) {
- //noinspection ManualArrayToCollectionCopy
- for (String value : stringValues[i]) {
- //noinspection UseBulkOperation
- stringSet.add(value);
- }
- }
- break;
- default:
- throw new IllegalStateException("Illegal data type for
DISTINCT_COUNT_MV aggregation function: " + storedType);
- }
+ mvAggregate(length, aggregationResultHolder, blockValSetMap);
}
@Override
public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
- BlockValSet blockValSet = blockValSetMap.get(_expression);
-
- // For dictionary-encoded expression, store dictionary ids into the bitmap
- Dictionary dictionary = blockValSet.getDictionary();
- if (dictionary != null) {
- int[][] dictIds = blockValSet.getDictionaryIdsMV();
- for (int i = 0; i < length; i++) {
- getDictIdBitmap(groupByResultHolder, groupKeyArray[i],
dictionary).add(dictIds[i]);
- }
- return;
- }
-
- // For non-dictionary-encoded expression, store values into the value set
- DataType storedType = blockValSet.getValueType().getStoredType();
- switch (storedType) {
- case INT:
- int[][] intValues = blockValSet.getIntValuesMV();
- for (int i = 0; i < length; i++) {
- IntOpenHashSet intSet = (IntOpenHashSet)
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.INT);
- for (int value : intValues[i]) {
- intSet.add(value);
- }
- }
- break;
- case LONG:
- long[][] longValues = blockValSet.getLongValuesMV();
- for (int i = 0; i < length; i++) {
- LongOpenHashSet longSet = (LongOpenHashSet)
getValueSet(groupByResultHolder, groupKeyArray[i], DataType.LONG);
- for (long value : longValues[i]) {
- longSet.add(value);
- }
- }
- break;
- case FLOAT:
- float[][] floatValues = blockValSet.getFloatValuesMV();
- for (int i = 0; i < length; i++) {
- FloatOpenHashSet floatSet =
- (FloatOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.FLOAT);
- for (float value : floatValues[i]) {
- floatSet.add(value);
- }
- }
- break;
- case DOUBLE:
- double[][] doubleValues = blockValSet.getDoubleValuesMV();
- for (int i = 0; i < length; i++) {
- DoubleOpenHashSet doubleSet =
- (DoubleOpenHashSet) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.DOUBLE);
- for (double value : doubleValues[i]) {
- doubleSet.add(value);
- }
- }
- break;
- case STRING:
- String[][] stringValues = blockValSet.getStringValuesMV();
- for (int i = 0; i < length; i++) {
- ObjectOpenHashSet<String> stringSet =
- (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder,
groupKeyArray[i], DataType.STRING);
- //noinspection ManualArrayToCollectionCopy
- for (String value : stringValues[i]) {
- //noinspection UseBulkOperation
- stringSet.add(value);
- }
- }
- break;
- default:
- throw new IllegalStateException("Illegal data type for
DISTINCT_COUNT_MV aggregation function: " + storedType);
- }
+ mvAggregateGroupBySV(length, groupKeyArray, groupByResultHolder,
blockValSetMap);
}
@Override
public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
- BlockValSet blockValSet = blockValSetMap.get(_expression);
+ mvAggregateGroupByMV(length, groupKeysArray, groupByResultHolder,
blockValSetMap);
+ }
- // For dictionary-encoded expression, store dictionary ids into the bitmap
- Dictionary dictionary = blockValSet.getDictionary();
- if (dictionary != null) {
- int[][] dictIds = blockValSet.getDictionaryIdsMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- getDictIdBitmap(groupByResultHolder, groupKey,
dictionary).add(dictIds[i]);
- }
- }
- return;
- }
+ @Override
+ public DataSchema.ColumnDataType getFinalResultColumnType() {
+ return DataSchema.ColumnDataType.INT;
+ }
- // For non-dictionary-encoded expression, store hash code of the values
into the value set
- DataType storedType = blockValSet.getValueType().getStoredType();
- switch (storedType) {
- case INT:
- int[][] intValues = blockValSet.getIntValuesMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- IntOpenHashSet intSet = (IntOpenHashSet)
getValueSet(groupByResultHolder, groupKey, DataType.INT);
- for (int value : intValues[i]) {
- intSet.add(value);
- }
- }
- }
- break;
- case LONG:
- long[][] longValues = blockValSet.getLongValuesMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- LongOpenHashSet longSet = (LongOpenHashSet)
getValueSet(groupByResultHolder, groupKey, DataType.LONG);
- for (long value : longValues[i]) {
- longSet.add(value);
- }
- }
- }
- break;
- case FLOAT:
- float[][] floatValues = blockValSet.getFloatValuesMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- FloatOpenHashSet floatSet = (FloatOpenHashSet)
getValueSet(groupByResultHolder, groupKey, DataType.FLOAT);
- for (float value : floatValues[i]) {
- floatSet.add(value);
- }
- }
- }
- break;
- case DOUBLE:
- double[][] doubleValues = blockValSet.getDoubleValuesMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- DoubleOpenHashSet doubleSet =
- (DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKey,
DataType.DOUBLE);
- for (double value : doubleValues[i]) {
- doubleSet.add(value);
- }
- }
- }
- break;
- case STRING:
- String[][] stringValues = blockValSet.getStringValuesMV();
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- ObjectOpenHashSet<String> stringSet =
- (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder,
groupKey, DataType.STRING);
- //noinspection ManualArrayToCollectionCopy
- for (String value : stringValues[i]) {
- //noinspection UseBulkOperation
- stringSet.add(value);
- }
- }
- }
- break;
- default:
- throw new IllegalStateException("Illegal data type for
DISTINCT_COUNT_MV aggregation function: " + storedType);
- }
+ @Override
+ public Integer extractFinalResult(Set intermediateResult) {
+ return intermediateResult.size();
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
index b42bc13b08..f3a6c55805 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
@@ -18,13 +18,18 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
+
/**
- * Aggregation function to compute the sum of distinct values.
+ * Aggregation function to compute the sum of distinct values for an SV column.
*/
public class DistinctSumAggregationFunction extends
BaseDistinctAggregateAggregationFunction<Double> {
@@ -32,6 +37,24 @@ public class DistinctSumAggregationFunction extends
BaseDistinctAggregateAggrega
super(expression, AggregationFunctionType.DISTINCTSUM);
}
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ svAggregate(length, aggregationResultHolder, blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ svAggregateGroupBySV(length, groupKeyArray, groupByResultHolder,
blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ svAggregateGroupByMV(length, groupKeysArray, groupByResultHolder,
blockValSetMap);
+ }
+
@Override
public DataSchema.ColumnDataType getFinalResultColumnType() {
return DataSchema.ColumnDataType.DOUBLE;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumMVAggregationFunction.java
similarity index 52%
copy from
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumMVAggregationFunction.java
index b42bc13b08..3ffd0acd16 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumMVAggregationFunction.java
@@ -18,18 +18,41 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
+
/**
- * Aggregation function to compute the sum of distinct values.
+ * Aggregation function to compute the sum of distinct values for an MV column.
*/
-public class DistinctSumAggregationFunction extends
BaseDistinctAggregateAggregationFunction<Double> {
+public class DistinctSumMVAggregationFunction extends
BaseDistinctAggregateAggregationFunction<Double> {
+
+ public DistinctSumMVAggregationFunction(ExpressionContext expression) {
+ super(expression, AggregationFunctionType.DISTINCTSUMMV);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ mvAggregate(length, aggregationResultHolder, blockValSetMap);
+ }
- public DistinctSumAggregationFunction(ExpressionContext expression) {
- super(expression, AggregationFunctionType.DISTINCTSUM);
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ mvAggregateGroupBySV(length, groupKeyArray, groupByResultHolder,
blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ mvAggregateGroupByMV(length, groupKeysArray, groupByResultHolder,
blockValSetMap);
}
@Override
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index b04cefb875..d74a81fa87 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -108,6 +108,8 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest
{
private final static String COL1_SORTED_INDEX = "sortedIndexCol1";
private final static String COL1_JSON_INDEX = "jsonIndexCol1";
private final static String COL1_TEXT_INDEX = "textIndexCol1";
+ private final static String MV_COL1_RAW = "mvRawCol1";
+ private final static String MV_COL1_NO_INDEX = "mvNoIndexCol1";
private static final Schema SCHEMA = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
.addSingleValueDimension(COL1_RAW, FieldSpec.DataType.INT)
@@ -123,14 +125,16 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
.addSingleValueDimension(COL3_RANGE_INDEX, FieldSpec.DataType.INT)
.addSingleValueDimension(COL1_SORTED_INDEX, FieldSpec.DataType.DOUBLE)
.addSingleValueDimension(COL1_JSON_INDEX, FieldSpec.DataType.JSON)
- .addSingleValueDimension(COL1_TEXT_INDEX,
FieldSpec.DataType.STRING).build();
+ .addSingleValueDimension(COL1_TEXT_INDEX, FieldSpec.DataType.STRING)
+ .addMultiValueDimension(MV_COL1_RAW, FieldSpec.DataType.INT)
+ .addMultiValueDimension(MV_COL1_NO_INDEX,
FieldSpec.DataType.INT).build();
private static final DataSchema DATA_SCHEMA = new DataSchema(new
String[]{"Operator", "Operator_Id", "Parent_Id"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.INT});
private static final TableConfig TABLE_CONFIG =
- new
TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(Arrays.asList(COL1_RAW))
+ new
TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(Arrays.asList(COL1_RAW,
MV_COL1_RAW))
.setTableName(RAW_TABLE_NAME).build();
private IndexSegment _indexSegment;
@@ -159,7 +163,7 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest
{
GenericRow createMockRecord(int noIndexCol1, int noIndexCol2, int
noIndexCol3,
boolean noIndexCol4, double invertedIndexCol1, int invertedIndexCol2,
String intervedIndexCol3,
double rangeIndexCol1, int rangeIndexCol2, int rangeIndexCol3, double
sortedIndexCol1, String jsonIndexCol1,
- String textIndexCol1, int rawCol1) {
+ String textIndexCol1, int rawCol1, Object[] mvRawCol1, Object[]
mvNoIndexCol1) {
GenericRow record = new GenericRow();
record.putValue(COL1_RAW, rawCol1);
@@ -182,6 +186,9 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest
{
record.putValue(COL1_JSON_INDEX, jsonIndexCol1);
record.putValue(COL1_TEXT_INDEX, textIndexCol1);
+ record.putValue(MV_COL1_RAW, mvRawCol1);
+ record.putValue(MV_COL1_NO_INDEX, mvNoIndexCol1);
+
return record;
}
@@ -232,38 +239,49 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
records.add(createMockRecord(1, 2, 3, true, 1.1, 2, "daffy", 10.1, 20, 30,
100.1,
- "{\"first\": \"daffy\", \"last\": " + "\"duck\"}", "daffy", 1));
+ "{\"first\": \"daffy\", \"last\": " + "\"duck\"}", "daffy", 1, new
Object[]{1, 2, 3}, new Object[]{1, 2, 3}));
records.add(createMockRecord(0, 1, 2, false, 0.1, 1, "mickey", 0.1, 10,
20, 100.2,
- "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 0));
+ "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 0, new
Object[]{2, 3, 4},
+ new Object[]{2, 3, 4}));
records.add(createMockRecord(3, 4, 5, true, 2.1, 3, "mickey", 20.1, 30,
40, 100.3,
- "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 3));
+ "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 3, new
Object[]{3, 4, 5},
+ new Object[]{3, 4, 5}));
ImmutableSegment immutableSegment1 = createImmutableSegment(records,
SEGMENT_NAME_1);
List<GenericRow> records2 = new ArrayList<>(NUM_RECORDS);
records2.add(createMockRecord(5, 2, 3, true, 1.1, 2, "pluto", 10.1, 20,
30, 100.1,
- "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 5));
+ "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 5, new
Object[]{100, 200, 300},
+ new Object[]{100, 200, 300}));
records2.add(createMockRecord(6, 1, 2, false, 0.1, 1, "pluto", 0.1, 10,
20, 100.2,
- "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 6));
+ "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 6, new
Object[]{200, 300, 400},
+ new Object[]{200, 300, 400}));
records2.add(createMockRecord(8, 4, 5, true, 2.1, 3, "pluto", 20.1, 30,
40, 100.3,
- "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 8));
+ "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 8, new
Object[]{300, 400, 500},
+ new Object[]{300, 400, 500}));
ImmutableSegment immutableSegment2 = createImmutableSegment(records2,
SEGMENT_NAME_2);
List<GenericRow> records3 = new ArrayList<>(NUM_RECORDS);
records3.add(createMockRecord(5, 2, 3, true, 1.5, 2, "donald", 10.1, 20,
30, 100.1,
- "{\"first\": \"donald\", \"last\": " + "\"duck\"}", "donald", 1));
+ "{\"first\": \"donald\", \"last\": " + "\"duck\"}", "donald", 1, new
Object[]{100, 200, 300},
+ new Object[]{100, 200, 300}));
records3.add(createMockRecord(6, 1, 2, false, 0.1, 1, "goofy", 0.1, 10,
20, 100.2,
- "{\"first\": \"goofy\", \"last\": " + "\"dog\"}", "goofy", 1));
+ "{\"first\": \"goofy\", \"last\": " + "\"dog\"}", "goofy", 1, new
Object[]{100, 200, 300},
+ new Object[]{100, 200, 300}));
records3.add(createMockRecord(7, 4, 5, true, 2.1, 3, "minnie", 20.1, 30,
40, 100.3,
- "{\"first\": \"minnie\", \"last\": " + "\"mouse\"}", "minnie", 1));
+ "{\"first\": \"minnie\", \"last\": " + "\"mouse\"}", "minnie", 1, new
Object[]{1000, 2000, 3000},
+ new Object[]{1000, 2000, 3000}));
ImmutableSegment immutableSegment3 = createImmutableSegment(records3,
SEGMENT_NAME_3);
List<GenericRow> records4 = new ArrayList<>(NUM_RECORDS);
records4.add(createMockRecord(5, 2, 3, true, 1.1, 2, "tweety", 10.1, 20,
30, 100.1,
- "{\"first\": \"tweety\", \"last\": " + "\"bird\"}", "tweety", 5));
+ "{\"first\": \"tweety\", \"last\": " + "\"bird\"}", "tweety", 5, new
Object[]{100, 200, 300},
+ new Object[]{100, 200, 300}));
records4.add(createMockRecord(6, 1, 2, false, 0.1, 1, "bugs", 0.1, 10, 20,
100.2,
- "{\"first\": \"bugs\", \"last\": " + "\"bunny\"}", "bugs", 6));
+ "{\"first\": \"bugs\", \"last\": " + "\"bunny\"}", "bugs", 6, new
Object[]{100, 200, 300},
+ new Object[]{100, 200, 300}));
records4.add(createMockRecord(7, 4, 5, true, 2.1, 3, "sylvester", 20.1,
30, 40, 100.3,
- "{\"first\": \"sylvester\", \"last\": " + "\"cat\"}", "sylvester", 7));
+ "{\"first\": \"sylvester\", \"last\": " + "\"cat\"}", "sylvester", 7,
new Object[]{1000, 2000, 3000},
+ new Object[]{1000, 2000, 3000}));
ImmutableSegment immutableSegment4 = createImmutableSegment(records4,
SEGMENT_NAME_4);
_indexSegment = immutableSegment1;
@@ -371,15 +389,16 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
result1.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)",
ExplainPlanRows.PLAN_START_IDS,
ExplainPlanRows.PLAN_START_IDS});
result1.add(new Object[]{
- "SELECT(selectList:invertedIndexCol1, invertedIndexCol2,
invertedIndexCol3, jsonIndexCol1, "
- + "noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4,
rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, "
- + "rawCol1, sortedIndexCol1, textIndexCol1)", 3, 2});
+ "SELECT(selectList:invertedIndexCol1, invertedIndexCol2,
invertedIndexCol3, jsonIndexCol1, mvNoIndexCol1, "
+ + "mvRawCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4,
rangeIndexCol1, rangeIndexCol2, "
+ + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 3,
2});
result1.add(new Object[]{"TRANSFORM_PASSTHROUGH(invertedIndexCol1,
invertedIndexCol2, invertedIndexCol3, "
- + "jsonIndexCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4,
rangeIndexCol1, rangeIndexCol2, "
- + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 4, 3});
- result1.add(new Object[]{"PROJECT(noIndexCol4, rawCol1, sortedIndexCol1,
noIndexCol3, rangeIndexCol1, "
- + "rangeIndexCol2, invertedIndexCol1, noIndexCol2, invertedIndexCol2,
noIndexCol1, rangeIndexCol3, "
- + "textIndexCol1, jsonIndexCol1, invertedIndexCol3)", 5, 4});
+ + "jsonIndexCol1, mvNoIndexCol1, mvRawCol1, noIndexCol1, noIndexCol2,
noIndexCol3, noIndexCol4, "
+ + "rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, rawCol1,
sortedIndexCol1, textIndexCol1)", 4, 3});
+ result1.add(new Object[]{"PROJECT(noIndexCol4, rawCol1, sortedIndexCol1,
noIndexCol3, mvNoIndexCol1"
+ + ", rangeIndexCol1, rangeIndexCol2, invertedIndexCol1, noIndexCol2,
invertedIndexCol2, noIndexCol1, "
+ + "rangeIndexCol3, textIndexCol1, mvRawCol1, jsonIndexCol1,
invertedIndexCol3)", 5, 4
+ });
result1.add(new Object[]{"DOC_ID_SET", 6, 5});
result1.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6});
check(query1, new ResultTable(DATA_SCHEMA, result1));
@@ -435,15 +454,19 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
result1.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)",
ExplainPlanRows.PLAN_START_IDS,
ExplainPlanRows.PLAN_START_IDS});
result1.add(new Object[]{
- "SELECT(selectList:invertedIndexCol1, invertedIndexCol2,
invertedIndexCol3, jsonIndexCol1, "
- + "noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4,
rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, "
- + "rawCol1, sortedIndexCol1, textIndexCol1)", 3, 2});
+ "SELECT(selectList:invertedIndexCol1, invertedIndexCol2,
invertedIndexCol3, jsonIndexCol1, mvNoIndexCol1, "
+ + "mvRawCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4,
rangeIndexCol1, rangeIndexCol2, "
+ + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 3, 2
+ });
result1.add(new Object[]{"TRANSFORM_PASSTHROUGH(invertedIndexCol1,
invertedIndexCol2, invertedIndexCol3, "
- + "jsonIndexCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4,
rangeIndexCol1, rangeIndexCol2, "
- + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 4, 3});
- result1.add(new Object[]{"PROJECT(noIndexCol4, rawCol1, sortedIndexCol1,
noIndexCol3, rangeIndexCol1, "
- + "rangeIndexCol2, invertedIndexCol1, noIndexCol2, invertedIndexCol2,
noIndexCol1, rangeIndexCol3, "
- + "textIndexCol1, jsonIndexCol1, invertedIndexCol3)", 5, 4});
+ + "jsonIndexCol1, mvNoIndexCol1, mvRawCol1, noIndexCol1, noIndexCol2,
noIndexCol3, noIndexCol4, "
+ + "rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, rawCol1,
sortedIndexCol1, textIndexCol1)", 4, 3
+ });
+ result1.add(new Object[]{
+ "PROJECT(noIndexCol4, rawCol1, sortedIndexCol1, noIndexCol3,
mvNoIndexCol1, "
+ + "rangeIndexCol1, rangeIndexCol2, invertedIndexCol1, noIndexCol2,
invertedIndexCol2, noIndexCol1, "
+ + "rangeIndexCol3, textIndexCol1, mvRawCol1, jsonIndexCol1,
invertedIndexCol3)", 5, 4
+ });
result1.add(new Object[]{"DOC_ID_SET", 6, 5});
result1.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6});
check(query1, new ResultTable(DATA_SCHEMA, result1));
@@ -1703,19 +1726,42 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
result5.add(new Object[]{"AGGREGATE_NO_SCAN", 3, 2});
check(query5, new ResultTable(DATA_SCHEMA, result5));
- // Full scan required for distinctavg as the column does not have a
dictionary.
- String query6 = "EXPLAIN PLAN FOR SELECT DISTINCTAVG(rawCol1) FROM
testTable";
+ String query6 = "EXPLAIN PLAN FOR SELECT DISTINCTSUMMV(mvNoIndexCol1) FROM
testTable";
List<Object[]> result6 = new ArrayList<>();
result6.add(new Object[]{"BROKER_REDUCE(limit:10)", 1, 0});
result6.add(new Object[]{"COMBINE_AGGREGATE", 2, 1});
- result6.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)",
ExplainPlanRows.PLAN_START_IDS,
- ExplainPlanRows.PLAN_START_IDS});
- result6.add(new Object[]{"AGGREGATE(aggregations:distinctAvg(rawCol1))",
3, 2});
- result6.add(new Object[]{"TRANSFORM_PASSTHROUGH(rawCol1)", 4, 3});
- result6.add(new Object[]{"PROJECT(rawCol1)", 5, 4});
- result6.add(new Object[]{"DOC_ID_SET", 6, 5});
- result6.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6});
+ result6.add(new Object[]{
+ "PLAN_START(numSegmentsForThisPlan:4)",
ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS
+ });
+ result6.add(new Object[]{"AGGREGATE_NO_SCAN", 3, 2});
check(query6, new ResultTable(DATA_SCHEMA, result6));
+
+ // Full scan required for distinctavg as the column does not have a
dictionary.
+ String query7 = "EXPLAIN PLAN FOR SELECT DISTINCTAVG(rawCol1) FROM
testTable";
+ List<Object[]> result7 = new ArrayList<>();
+ result7.add(new Object[]{"BROKER_REDUCE(limit:10)", 1, 0});
+ result7.add(new Object[]{"COMBINE_AGGREGATE", 2, 1});
+ result7.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)",
ExplainPlanRows.PLAN_START_IDS,
+ ExplainPlanRows.PLAN_START_IDS});
+ result7.add(new Object[]{"AGGREGATE(aggregations:distinctAvg(rawCol1))",
3, 2});
+ result7.add(new Object[]{"TRANSFORM_PASSTHROUGH(rawCol1)", 4, 3});
+ result7.add(new Object[]{"PROJECT(rawCol1)", 5, 4});
+ result7.add(new Object[]{"DOC_ID_SET", 6, 5});
+ result7.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6});
+ check(query7, new ResultTable(DATA_SCHEMA, result7));
+
+ String query8 = "EXPLAIN PLAN FOR SELECT DISTINCTAVGMV(mvRawCol1) FROM
testTable";
+ List<Object[]> result8 = new ArrayList<>();
+ result8.add(new Object[]{"BROKER_REDUCE(limit:10)", 1, 0});
+ result8.add(new Object[]{"COMBINE_AGGREGATE", 2, 1});
+ result8.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)",
ExplainPlanRows.PLAN_START_IDS,
+ ExplainPlanRows.PLAN_START_IDS});
+ result8.add(new
Object[]{"AGGREGATE(aggregations:distinctAvgMV(mvRawCol1))", 3, 2});
+ result8.add(new Object[]{"TRANSFORM_PASSTHROUGH(mvRawCol1)", 4, 3});
+ result8.add(new Object[]{"PROJECT(mvRawCol1)", 5, 4});
+ result8.add(new Object[]{"DOC_ID_SET", 6, 5});
+ result8.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6});
+ check(query8, new ResultTable(DATA_SCHEMA, result8));
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index 760c1c78c1..4d32358814 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -121,7 +121,8 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
public void testMaxMV() {
String query = "SELECT MAXMV(column6) AS value FROM testTable";
- // Without filter, query should be answered by
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
BrokerResponseNative brokerResponse = getBrokerResponse(query);
DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
ColumnDataType[]{ColumnDataType.DOUBLE});
ResultTable expectedResultTable =
@@ -148,7 +149,8 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
public void testMinMV() {
String query = "SELECT MINMV(column6) AS value FROM testTable";
- // Without filter, query should be answered by
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
BrokerResponseNative brokerResponse = getBrokerResponse(query);
DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
ColumnDataType[]{ColumnDataType.DOUBLE});
Object[] expectedResults = new Object[]{1001.0};
@@ -245,7 +247,8 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
public void testMinMaxRangeMV() {
String query = "SELECT MINMAXRANGEMV(column6) AS value FROM testTable";
- // Without filter, query should be answered by
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
BrokerResponseNative brokerResponse = getBrokerResponse(query);
DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
ColumnDataType[]{ColumnDataType.DOUBLE});
Object[] expectedResults = new Object[]{2147482646.0};
@@ -277,7 +280,8 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
public void testDistinctCountMV() {
String query = "SELECT DISTINCTCOUNTMV(column6) AS value FROM testTable";
- // Without filter, query should be answered by
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
BrokerResponseNative brokerResponse = getBrokerResponse(query);
DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
ColumnDataType[]{ColumnDataType.INT});
Object[] expectedResults = new Object[]{18499};
@@ -305,11 +309,78 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
124960L, 400000L, expectedResultTable);
}
+ @Test
+ public void testDistinctSumMV() {
+ String query = "SELECT DISTINCTSUMMV(column6) AS value FROM testTable";
+
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
ColumnDataType[]{ColumnDataType.DOUBLE});
+ Object[] expectedResults = new Object[]{24592775810.0};
+ ResultTable expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(expectedResults));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L,
400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResults[0] = 2578123532.0;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
62480L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+ expectedResults[0] = 6304833321.0;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+ expectedResults[0] = 2578123532.0;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
124960L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+ expectedResults[0] = 8999975927.0;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+ expectedResults[0] = 2478539095.0;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
124960L, 400000L, expectedResultTable);
+ }
+
+ @Test
+ public void testDistinctAvgMV() {
+ String query = "SELECT DISTINCTAVGMV(column6) AS value FROM testTable";
+
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
ColumnDataType[]{ColumnDataType.DOUBLE});
+ Object[] expectedResults = new Object[]{1329411.0930320558};
+ ResultTable expectedResultTable = new ResultTable(expectedDataSchema,
Collections.singletonList(expectedResults));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L,
400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResults[0] = 2173797.244519393;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
62480L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + SV_GROUP_BY);
+ expectedResults[0] = 2147483647.0;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY);
+ expectedResults[0] = 2147483647.0;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
124960L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + MV_GROUP_BY);
+ expectedResults[0] = 2147483647.0;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L,
800000L, 400000L, expectedResultTable);
+
+ brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY);
+ expectedResults[0] = 2147483647.0;
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L,
124960L, 400000L, expectedResultTable);
+ }
+
@Test
public void testDistinctCountHLLMV() {
String query = "SELECT DISTINCTCOUNTHLLMV(column6) AS value FROM
testTable";
- // Without filter, query should be answered by
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
BrokerResponseNative brokerResponse = getBrokerResponse(query);
DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
ColumnDataType[]{ColumnDataType.LONG});
Object[] expectedResults = new Object[]{20039L};
@@ -343,7 +414,8 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
Function<Object, Object> cardinalityExtractor =
value ->
ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(BytesUtils.toBytes((String)
value)).cardinality();
- // Without filter, query should be answered by
DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // Without filter, query should be answered by
NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0)
+ // for dictionary based columns.
BrokerResponseNative brokerResponse = getBrokerResponse(query);
DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new
ColumnDataType[]{ColumnDataType.LONG});
Object[] expectedResults = new Object[]{20039L};
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index fc2aec9bc9..8f53122bd4 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -82,6 +82,8 @@ public enum AggregationFunctionType {
DISTINCTCOUNTBITMAPMV("distinctCountBitmapMV"),
DISTINCTCOUNTHLLMV("distinctCountHLLMV"),
DISTINCTCOUNTRAWHLLMV("distinctCountRawHLLMV"),
+ DISTINCTSUMMV("distinctSumMV"),
+ DISTINCTAVGMV("distinctAvgMV"),
PERCENTILEMV("percentileMV"),
PERCENTILEESTMV("percentileEstMV"),
PERCENTILERAWESTMV("percentileRawEstMV"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]