This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2d1f7dc Add functions to return raw results for Percentile TDigest
and Est (#7226)
2d1f7dc is described below
commit 2d1f7dc9060d45ac9f4ff0621da62790ea9b8e64
Author: lakshmanan-v <[email protected]>
AuthorDate: Tue Aug 24 13:54:41 2021 -0700
Add functions to return raw results for Percentile TDigest and Est (#7226)
Adding aggregate functions to return serialized / raw values of
PercentileEst (QuantileDigest) and PercentileTDigest (TDigest) data structures.
---
.../function/AggregationFunctionTypeTest.java | 8 ++
.../function/AggregationFunctionFactory.java | 32 +++++
.../PercentileRawEstAggregationFunction.java | 141 ++++++++++++++++++++
.../PercentileRawEstMVAggregationFunction.java | 43 +++++++
.../PercentileRawTDigestAggregationFunction.java | 142 +++++++++++++++++++++
.../PercentileRawTDigestMVAggregationFunction.java | 43 +++++++
.../function/AggregationFunctionFactoryTest.java | 42 ++++++
.../apache/pinot/queries/ExpectedQueryResult.java | 56 ++++++++
...terSegmentAggregationMultiValueQueriesTest.java | 113 ++++++++++++++++
...erSegmentAggregationSingleValueQueriesTest.java | 118 +++++++++++++++++
.../org/apache/pinot/queries/QueriesTestUtils.java | 105 ++++++++++++---
.../local/aggregator/ValueAggregatorFactory.java | 4 +
.../customobject/SerializedQuantileDigest.java | 50 ++++++++
.../local/customobject/SerializedTDigest.java | 49 +++++++
.../pinot/segment/spi/AggregationFunctionType.java | 12 ++
.../misc/AggregationFunctionColumnPairTest.java | 18 +++
16 files changed, 956 insertions(+), 20 deletions(-)
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
index 5b1d640..042381b 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
@@ -48,6 +48,10 @@ public class AggregationFunctionTypeTest {
AggregationFunctionType.PERCENTILEEST);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeTdIgEsT99"),
AggregationFunctionType.PERCENTILETDIGEST);
+
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWeSt90mV"),
+ AggregationFunctionType.PERCENTILERAWESTMV);
+
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWtDiGeSt95mV"),
+ AggregationFunctionType.PERCENTILERAWTDIGESTMV);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("CoUnTMv"),
AggregationFunctionType.COUNTMV);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MiNmV"),
AggregationFunctionType.MINMV);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MaXmV"),
AggregationFunctionType.MAXMV);
@@ -67,6 +71,10 @@ public class AggregationFunctionTypeTest {
AggregationFunctionType.PERCENTILEESTMV);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeTdIgEsT95mV"),
AggregationFunctionType.PERCENTILETDIGESTMV);
+
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWeSt50"),
+ AggregationFunctionType.PERCENTILERAWEST);
+
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWtDiGeSt99"),
+ AggregationFunctionType.PERCENTILERAWTDIGEST);
}
@Test(expectedExceptions = IllegalArgumentException.class)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index ccd45fc..3285607 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -61,10 +61,18 @@ public class AggregationFunctionFactory {
// PercentileEst
String percentileString = remainingFunctionName.substring(3);
return new PercentileEstAggregationFunction(firstArgument,
parsePercentileToInt(percentileString));
+ } else if (remainingFunctionName.matches("RAWEST\\d+")) {
+ // PercentileRawEst
+ String percentileString = remainingFunctionName.substring(6);
+ return new PercentileRawEstAggregationFunction(firstArgument,
parsePercentileToInt(percentileString));
} else if (remainingFunctionName.matches("TDIGEST\\d+")) {
// PercentileTDigest
String percentileString = remainingFunctionName.substring(7);
return new PercentileTDigestAggregationFunction(firstArgument,
parsePercentileToInt(percentileString));
+ } else if (remainingFunctionName.matches("RAWTDIGEST\\d+")) {
+ // PercentileRawTDigest
+ String percentileString = remainingFunctionName.substring(10);
+ return new PercentileRawTDigestAggregationFunction(firstArgument,
parsePercentileToInt(percentileString));
} else if (remainingFunctionName.matches("\\d+MV")) {
// PercentileMV
String percentileString = remainingFunctionName.substring(0,
remainingFunctionName.length() - 2);
@@ -73,10 +81,18 @@ public class AggregationFunctionFactory {
// PercentileEstMV
String percentileString = remainingFunctionName.substring(3,
remainingFunctionName.length() - 2);
return new PercentileEstMVAggregationFunction(firstArgument,
parsePercentileToInt(percentileString));
+ } else if (remainingFunctionName.matches("RAWEST\\d+MV")) {
+ // PercentileRawEstMV
+ String percentileString = remainingFunctionName.substring(6,
remainingFunctionName.length() - 2);
+ return new PercentileRawEstMVAggregationFunction(firstArgument,
parsePercentileToInt(percentileString));
} else if (remainingFunctionName.matches("TDIGEST\\d+MV")) {
// PercentileTDigestMV
String percentileString = remainingFunctionName.substring(7,
remainingFunctionName.length() - 2);
return new PercentileTDigestMVAggregationFunction(firstArgument,
parsePercentileToInt(percentileString));
+ } else if (remainingFunctionName.matches("RAWTDIGEST\\d+MV")) {
+ // PercentileRawTDigestMV
+ String percentileString = remainingFunctionName.substring(10,
remainingFunctionName.length() - 2);
+ return new
PercentileRawTDigestMVAggregationFunction(firstArgument,
parsePercentileToInt(percentileString));
}
} else if (numArguments == 2) {
// Double arguments percentile (e.g. percentile(foo, 99),
percentileTDigest(bar, 95), etc.) where the
@@ -90,10 +106,18 @@ public class AggregationFunctionFactory {
// PercentileEst
return new PercentileEstAggregationFunction(firstArgument,
percentile);
}
+ if (remainingFunctionName.equals("RAWEST")) {
+ // PercentileRawEst
+ return new PercentileRawEstAggregationFunction(firstArgument,
percentile);
+ }
if (remainingFunctionName.equals("TDIGEST")) {
// PercentileTDigest
return new PercentileTDigestAggregationFunction(firstArgument,
percentile);
}
+ if (remainingFunctionName.equals("RAWTDIGEST")) {
+ // PercentileRawTDigest
+ return new PercentileRawTDigestAggregationFunction(firstArgument,
percentile);
+ }
if (remainingFunctionName.equals("MV")) {
// PercentileMV
return new PercentileMVAggregationFunction(firstArgument,
percentile);
@@ -102,10 +126,18 @@ public class AggregationFunctionFactory {
// PercentileEstMV
return new PercentileEstMVAggregationFunction(firstArgument,
percentile);
}
+ if (remainingFunctionName.equals("RAWESTMV")) {
+ // PercentileRawEstMV
+ return new PercentileRawEstMVAggregationFunction(firstArgument,
percentile);
+ }
if (remainingFunctionName.equals("TDIGESTMV")) {
// PercentileTDigestMV
return new PercentileTDigestMVAggregationFunction(firstArgument,
percentile);
}
+ if (remainingFunctionName.equals("RAWTDIGESTMV")) {
+ // PercentileRawTDigestMV
+ return new
PercentileRawTDigestMVAggregationFunction(firstArgument, percentile);
+ }
}
throw new IllegalArgumentException("Invalid percentile function: " +
function);
} else {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstAggregationFunction.java
new file mode 100644
index 0000000..dc5d435
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstAggregationFunction.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.QuantileDigest;
+import org.apache.pinot.segment.local.customobject.SerializedQuantileDigest;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code PercentileRawEstAggregationFunction} returns the serialized
{@code QuantileDigest} data structure of the
+ * {@code PercentileEstAggregationFunction}.
+ */
+public class PercentileRawEstAggregationFunction
+ extends BaseSingleInputAggregationFunction<QuantileDigest,
SerializedQuantileDigest> {
+ private final PercentileEstAggregationFunction
_percentileEstAggregationFunction;
+
+ public PercentileRawEstAggregationFunction(ExpressionContext
expressionContext, double percentile) {
+ this(expressionContext, new
PercentileEstAggregationFunction(expressionContext, percentile));
+ }
+
+ public PercentileRawEstAggregationFunction(ExpressionContext
expressionContext, int percentile) {
+ this(expressionContext, new
PercentileEstAggregationFunction(expressionContext, percentile));
+ }
+
+ protected PercentileRawEstAggregationFunction(ExpressionContext expression,
+ PercentileEstAggregationFunction percentileEstAggregationFunction) {
+ super(expression);
+ _percentileEstAggregationFunction = percentileEstAggregationFunction;
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.PERCENTILERAWEST;
+ }
+
+ @Override
+ public String getColumnName() {
+ final double percentile = _percentileEstAggregationFunction._percentile;
+ final int version = _percentileEstAggregationFunction._version;
+ final String type = getType().getName();
+
+ return version == 0 ? type + (int) percentile + "_" + _expression : type +
percentile + "_" + _expression;
+ }
+
+ @Override
+ public String getResultColumnName() {
+ final double percentile = _percentileEstAggregationFunction._percentile;
+ final int version = _percentileEstAggregationFunction._version;
+ final String type = getType().getName().toLowerCase();
+
+ return version == 0 ? type + (int) percentile + "(" + _expression + ")"
+ : type + "(" + _expression + ", " + percentile + ")";
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return _percentileEstAggregationFunction.createAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return
_percentileEstAggregationFunction.createGroupByResultHolder(initialCapacity,
maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _percentileEstAggregationFunction.aggregate(length,
aggregationResultHolder, blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _percentileEstAggregationFunction.aggregateGroupBySV(length,
groupKeyArray, groupByResultHolder, blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _percentileEstAggregationFunction
+ .aggregateGroupByMV(length, groupKeysArray, groupByResultHolder,
blockValSetMap);
+ }
+
+ @Override
+ public QuantileDigest extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ return
_percentileEstAggregationFunction.extractAggregationResult(aggregationResultHolder);
+ }
+
+ @Override
+ public QuantileDigest extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ return
_percentileEstAggregationFunction.extractGroupByResult(groupByResultHolder,
groupKey);
+ }
+
+ @Override
+ public QuantileDigest merge(QuantileDigest intermediateResult1,
QuantileDigest intermediateResult2) {
+ return _percentileEstAggregationFunction.merge(intermediateResult1,
intermediateResult2);
+ }
+
+ @Override
+ public boolean isIntermediateResultComparable() {
+ return _percentileEstAggregationFunction.isIntermediateResultComparable();
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return _percentileEstAggregationFunction.getIntermediateResultColumnType();
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.STRING;
+ }
+
+ @Override
+ public SerializedQuantileDigest extractFinalResult(QuantileDigest
intermediateResult) {
+ return new SerializedQuantileDigest(intermediateResult,
_percentileEstAggregationFunction._percentile);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstMVAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstMVAggregationFunction.java
new file mode 100644
index 0000000..2f2bcfd
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstMVAggregationFunction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code PercentileRawEstMVAggregationFunction} returns the serialized
{@code QuantileDigest} data structure of the
+ * {@code PercentileEstMVAggregationFunction}.
+ */
+public class PercentileRawEstMVAggregationFunction extends
PercentileRawEstAggregationFunction {
+
+ public PercentileRawEstMVAggregationFunction(ExpressionContext
expressionContext, int percentile) {
+ super(expressionContext, new
PercentileEstMVAggregationFunction(expressionContext, percentile));
+ }
+
+ public PercentileRawEstMVAggregationFunction(ExpressionContext
expressionContext, double percentile) {
+ super(expressionContext, new
PercentileEstMVAggregationFunction(expressionContext, percentile));
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.PERCENTILERAWESTMV;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestAggregationFunction.java
new file mode 100644
index 0000000..2724b21
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestAggregationFunction.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.tdunning.math.stats.TDigest;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.SerializedTDigest;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code PercentileRawTDigestAggregationFunction} returns the serialized
{@code TDigest} data structure of the
+ * {@code PercentileEstAggregationFunction}.
+ */
+public class PercentileRawTDigestAggregationFunction
+ extends BaseSingleInputAggregationFunction<TDigest, SerializedTDigest> {
+ private final PercentileTDigestAggregationFunction
_percentileTDigestAggregationFunction;
+
+ public PercentileRawTDigestAggregationFunction(ExpressionContext
expressionContext, int percentile) {
+ this(expressionContext, new
PercentileTDigestAggregationFunction(expressionContext, percentile));
+ }
+
+ public PercentileRawTDigestAggregationFunction(ExpressionContext
expressionContext, double percentile) {
+ this(expressionContext, new
PercentileTDigestAggregationFunction(expressionContext, percentile));
+ }
+
+ protected PercentileRawTDigestAggregationFunction(ExpressionContext
expression,
+ PercentileTDigestAggregationFunction
percentileTDigestAggregationFunction) {
+ super(expression);
+ _percentileTDigestAggregationFunction =
percentileTDigestAggregationFunction;
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.PERCENTILERAWTDIGEST;
+ }
+
+ @Override
+ public String getColumnName() {
+ final double percentile =
_percentileTDigestAggregationFunction._percentile;
+ final int version = _percentileTDigestAggregationFunction._version;
+ final String type = getType().getName();
+
+ return version == 0 ? type + (int) percentile + "_" + _expression : type +
percentile + "_" + _expression;
+ }
+
+ @Override
+ public String getResultColumnName() {
+ final double percentile =
_percentileTDigestAggregationFunction._percentile;
+ final int version = _percentileTDigestAggregationFunction._version;
+ final String type = getType().getName().toLowerCase();
+
+ return version == 0 ? type + (int) percentile + "(" + _expression + ")"
+ : type + "(" + _expression + ", " + percentile + ")";
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return
_percentileTDigestAggregationFunction.createAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return
_percentileTDigestAggregationFunction.createGroupByResultHolder(initialCapacity,
maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _percentileTDigestAggregationFunction.aggregate(length,
aggregationResultHolder, blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _percentileTDigestAggregationFunction
+ .aggregateGroupBySV(length, groupKeyArray, groupByResultHolder,
blockValSetMap);
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ _percentileTDigestAggregationFunction
+ .aggregateGroupByMV(length, groupKeysArray, groupByResultHolder,
blockValSetMap);
+ }
+
+ @Override
+ public TDigest extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ return
_percentileTDigestAggregationFunction.extractAggregationResult(aggregationResultHolder);
+ }
+
+ @Override
+ public TDigest extractGroupByResult(GroupByResultHolder groupByResultHolder,
int groupKey) {
+ return
_percentileTDigestAggregationFunction.extractGroupByResult(groupByResultHolder,
groupKey);
+ }
+
+ @Override
+ public TDigest merge(TDigest intermediateResult1, TDigest
intermediateResult2) {
+ return _percentileTDigestAggregationFunction.merge(intermediateResult1,
intermediateResult2);
+ }
+
+ @Override
+ public boolean isIntermediateResultComparable() {
+ return
_percentileTDigestAggregationFunction.isIntermediateResultComparable();
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return
_percentileTDigestAggregationFunction.getIntermediateResultColumnType();
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.STRING;
+ }
+
+ @Override
+ public SerializedTDigest extractFinalResult(TDigest intermediateResult) {
+ return new SerializedTDigest(intermediateResult,
_percentileTDigestAggregationFunction._percentile);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestMVAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestMVAggregationFunction.java
new file mode 100644
index 0000000..4cf5cbb
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestMVAggregationFunction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code PercentileRawTDigestMVAggregationFunction} returns the
serialized {@code TDigest} data structure of the
+ * {@code PercentileTDigestMVAggregationFunction}.
+ */
+public class PercentileRawTDigestMVAggregationFunction extends
PercentileRawTDigestAggregationFunction {
+
+ public PercentileRawTDigestMVAggregationFunction(ExpressionContext
expressionContext, int percentile) {
+ super(expressionContext, new
PercentileTDigestMVAggregationFunction(expressionContext, percentile));
+ }
+
+ public PercentileRawTDigestMVAggregationFunction(ExpressionContext
expressionContext, double percentile) {
+ super(expressionContext, new
PercentileTDigestMVAggregationFunction(expressionContext, percentile));
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.PERCENTILERAWTDIGESTMV;
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index b855806..e5e0e22 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -136,6 +136,13 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getColumnName(),
"percentileEst50_column");
assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+ function = getFunction("PeRcEnTiLeRaWEsT50");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
PercentileRawEstAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.PERCENTILERAWEST);
+ assertEquals(aggregationFunction.getColumnName(),
"percentileRawEst50_column");
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
function = getFunction("PeRcEnTiLeTdIgEsT99");
aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof
PercentileTDigestAggregationFunction);
@@ -143,6 +150,13 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getColumnName(),
"percentileTDigest99_column");
assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+ function = getFunction("PeRcEnTiLeRaWTdIgEsT99");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
PercentileRawTDigestAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.PERCENTILERAWTDIGEST);
+ assertEquals(aggregationFunction.getColumnName(),
"percentileRawTDigest99_column");
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
function = getFunction("PeRcEnTiLe", "(column, 5)");
aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof PercentileAggregationFunction);
@@ -164,6 +178,13 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getColumnName(),
"percentileEst50.0_column");
assertEquals(aggregationFunction.getResultColumnName(),
"percentileest(column, 50.0)");
+ function = getFunction("PeRcEnTiLeRaWeSt", "(column, 50)");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
PercentileRawEstAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.PERCENTILERAWEST);
+ assertEquals(aggregationFunction.getColumnName(),
"percentileRawEst50.0_column");
+ assertEquals(aggregationFunction.getResultColumnName(),
"percentilerawest(column, 50.0)");
+
function = getFunction("PeRcEnTiLeEsT", "(column, 55.555)");
aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof
PercentileEstAggregationFunction);
@@ -171,6 +192,13 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getColumnName(),
"percentileEst55.555_column");
assertEquals(aggregationFunction.getResultColumnName(),
"percentileest(column, 55.555)");
+ function = getFunction("PeRcEnTiLeRaWeSt", "(column, 55.555)");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
PercentileRawEstAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.PERCENTILERAWEST);
+ assertEquals(aggregationFunction.getColumnName(),
"percentileRawEst55.555_column");
+ assertEquals(aggregationFunction.getResultColumnName(),
"percentilerawest(column, 55.555)");
+
function = getFunction("PeRcEnTiLeTdIgEsT", "(column, 99)");
aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof
PercentileTDigestAggregationFunction);
@@ -185,6 +213,20 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getColumnName(),
"percentileTDigest99.9999_column");
assertEquals(aggregationFunction.getResultColumnName(),
"percentiletdigest(column, 99.9999)");
+ function = getFunction("PeRcEnTiLeRaWtDiGeSt", "(column, 99)");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
PercentileRawTDigestAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.PERCENTILERAWTDIGEST);
+ assertEquals(aggregationFunction.getColumnName(),
"percentileRawTDigest99.0_column");
+ assertEquals(aggregationFunction.getResultColumnName(),
"percentilerawtdigest(column, 99.0)");
+
+ function = getFunction("PeRcEnTiLeRaWtDiGeSt", "(column, 99.9999)");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
PercentileRawTDigestAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.PERCENTILERAWTDIGEST);
+ assertEquals(aggregationFunction.getColumnName(),
"percentileRawTDigest99.9999_column");
+ assertEquals(aggregationFunction.getResultColumnName(),
"percentilerawtdigest(column, 99.9999)");
+
function = getFunction("CoUnTmV");
aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof CountMVAggregationFunction);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/ExpectedQueryResult.java
b/pinot-core/src/test/java/org/apache/pinot/queries/ExpectedQueryResult.java
new file mode 100644
index 0000000..520f2ee
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExpectedQueryResult.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+public class ExpectedQueryResult<T> {
+ private long _numDocsScanned;
+ private long _numEntriesScannedInFilter;
+ private long _numEntriesScannedPostFilter;
+ private long _numTotalDocs;
+ private T[] _results;
+
+ public ExpectedQueryResult(long numDocsScanned, long
numEntriesScannedInFilter, long numEntriesScannedPostFilter,
+ long numTotalDocs, T[] results) {
+ _numDocsScanned = numDocsScanned;
+ _numEntriesScannedInFilter = numEntriesScannedInFilter;
+ _numEntriesScannedPostFilter = numEntriesScannedPostFilter;
+ _numTotalDocs = numTotalDocs;
+ _results = results;
+ }
+
+ public long getNumDocsScanned() {
+ return _numDocsScanned;
+ }
+
+ public long getNumEntriesScannedInFilter() {
+ return _numEntriesScannedInFilter;
+ }
+
+ public long getNumEntriesScannedPostFilter() {
+ return _numEntriesScannedPostFilter;
+ }
+
+ public long getNumTotalDocs() {
+ return _numTotalDocs;
+ }
+
+ public T[] getResults() {
+ return _results;
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index 4856eab..6c2b730 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -42,6 +42,9 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
private static final String MV_GROUP_BY = " group by column7";
private static final String ORDER_BY_ALIAS = " order by cnt_column6 DESC";
+ // Allow 5% quantile error due to the randomness of TDigest merge
+ private static final double PERCENTILE_TDIGEST_DELTA = 0.05 *
Integer.MAX_VALUE;
+
@Test
public void testCountMV() {
String query = "SELECT COUNTMV(column6) FROM testTable";
@@ -525,6 +528,116 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
}
@Test
+ public void testPercentileRawEst50MV() {
+ testPercentileRawEstAggregationFunction(50);
+ }
+
+ @Test
+ public void testPercentileRawEst90MV() {
+ testPercentileRawEstAggregationFunction(90);
+ }
+
+ @Test
+ public void testPercentileRawEst95MV() {
+ testPercentileRawEstAggregationFunction(95);
+ }
+
+ @Test
+ public void testPercentileRawEst99MV() {
+ testPercentileRawEstAggregationFunction(99);
+ }
+
+ private void testPercentileRawEstAggregationFunction(int percentile) {
+ Function<Serializable, String> quantileExtractor = value -> String.valueOf(
+
ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(BytesUtils.toBytes((String)
value))
+ .getQuantile(percentile / 100.0));
+
+ String rawQuery =
+ String.format("SELECT PERCENTILERAWEST%dMV(column6) FROM testTable",
percentile);
+
+ String query =
+ String.format("SELECT PERCENTILEEST%dMV(column6) FROM testTable",
percentile);
+
+ queryAndTestAggregationResult(rawQuery, getExpectedQueryResults(query),
quantileExtractor);
+
+ queryAndTestAggregationResult(rawQuery + getFilter(),
getExpectedQueryResults(query + getFilter()),
+ quantileExtractor);
+
+ queryAndTestAggregationResult(rawQuery + SV_GROUP_BY,
getExpectedQueryResults(query + SV_GROUP_BY),
+ quantileExtractor);
+
+ queryAndTestAggregationResult(rawQuery + MV_GROUP_BY,
getExpectedQueryResults(query + MV_GROUP_BY),
+ quantileExtractor);
+ }
+
+ private void testPercentileRawTDigestAggregationFunction(int percentile) {
+ Function<Serializable, String> quantileExtractor = value -> String.valueOf(
+
ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(BytesUtils.toBytes((String) value))
+ .quantile(percentile / 100.0));
+
+ String rawQuery =
+ String.format("SELECT PERCENTILERAWTDIGEST%dMV(column6) FROM
testTable", percentile);
+
+ String query =
+ String.format("SELECT PERCENTILETDIGEST%dMV(column6) FROM testTable",
percentile);
+
+ queryAndTestAggregationResultWithDelta(rawQuery,
getExpectedQueryResults(query), quantileExtractor);
+
+ queryAndTestAggregationResultWithDelta(rawQuery + getFilter(),
getExpectedQueryResults(query + getFilter()),
+ quantileExtractor);
+
+ queryAndTestAggregationResultWithDelta(rawQuery + SV_GROUP_BY,
getExpectedQueryResults(query + SV_GROUP_BY),
+ quantileExtractor);
+
+ queryAndTestAggregationResultWithDelta(rawQuery + MV_GROUP_BY,
getExpectedQueryResults(query + MV_GROUP_BY),
+ quantileExtractor);
+ }
+
+ private ExpectedQueryResult<String> getExpectedQueryResults(String query) {
+ return
QueriesTestUtils.buildExpectedResponse(getBrokerResponseForPqlQuery(query));
+ }
+
+ private void queryAndTestAggregationResultWithDelta(String query,
ExpectedQueryResult<String> expectedQueryResults,
+ Function<Serializable, String> responseMapper) {
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+
+ QueriesTestUtils
+ .testInterSegmentApproximateAggregationResult(brokerResponse,
expectedQueryResults.getNumDocsScanned(),
+ expectedQueryResults.getNumEntriesScannedInFilter(),
expectedQueryResults.getNumEntriesScannedPostFilter(),
+ expectedQueryResults.getNumTotalDocs(), responseMapper,
expectedQueryResults.getResults(),
+ PERCENTILE_TDIGEST_DELTA);
+ }
+
+ private void queryAndTestAggregationResult(String query,
ExpectedQueryResult<String> expectedQueryResults,
+ Function<Serializable, String> responseMapper) {
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+ QueriesTestUtils
+ .testInterSegmentAggregationResult(brokerResponse,
expectedQueryResults.getNumDocsScanned(),
+ expectedQueryResults.getNumEntriesScannedInFilter(),
expectedQueryResults.getNumEntriesScannedPostFilter(),
+ expectedQueryResults.getNumTotalDocs(), responseMapper,
expectedQueryResults.getResults());
+ }
+
+ @Test
+ public void testPercentileRawTDigest50MV() {
+ testPercentileRawTDigestAggregationFunction(50);
+ }
+
+ @Test
+ public void testPercentileRawTDigest90MV() {
+ testPercentileRawTDigestAggregationFunction(90);
+ }
+
+ @Test
+ public void testPercentileRawTDigest95MV() {
+ testPercentileRawTDigestAggregationFunction(95);
+ }
+
+ @Test
+ public void testPercentileRawTDigest99MV() {
+ testPercentileRawTDigestAggregationFunction(99);
+ }
+
+ @Test
public void testNumGroupsLimit() {
String query = "SELECT COUNT(*) FROM testTable GROUP BY column6";
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
index 54654ed..b88d38a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
@@ -37,6 +37,9 @@ import static org.testng.Assert.assertTrue;
public class InterSegmentAggregationSingleValueQueriesTest extends
BaseSingleValueQueriesTest {
private static final String GROUP_BY = " group by column9";
+ // Allow 5% quantile error due to the randomness of TDigest merge
+ private static final double PERCENTILE_TDIGEST_DELTA = 0.05 *
Integer.MAX_VALUE;
+
@Test
public void testCount() {
String query = "SELECT COUNT(*) FROM testTable";
@@ -413,6 +416,121 @@ public class
InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
}
@Test
+ public void testPercentileRawEst50() {
+ testPercentileRawEstAggregationFunction(50);
+ }
+
+ @Test
+ public void testPercentileRawEst90() {
+ testPercentileRawEstAggregationFunction(90);
+ }
+
+ @Test
+ public void testPercentileRawEst95() {
+ testPercentileRawEstAggregationFunction(95);
+ }
+
+ @Test
+ public void testPercentileRawEst99() {
+ testPercentileRawEstAggregationFunction(99);
+ }
+
+ private void queryAndTestAggregationResult(String query,
ExpectedQueryResult<String> expectedQueryResults,
+ Function<Serializable, String> responseMapper) {
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+ QueriesTestUtils
+ .testInterSegmentAggregationResult(brokerResponse,
expectedQueryResults.getNumDocsScanned(),
+ expectedQueryResults.getNumEntriesScannedInFilter(),
expectedQueryResults.getNumEntriesScannedPostFilter(),
+ expectedQueryResults.getNumTotalDocs(), responseMapper,
expectedQueryResults.getResults());
+ }
+
+ private void testPercentileRawEstAggregationFunction(int percentile) {
+ Function<Serializable, String> quantileExtractor = value -> String.valueOf(
+
ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(BytesUtils.toBytes((String)
value))
+ .getQuantile(percentile / 100.0));
+
+ String rawQuery =
+ String.format("SELECT PERCENTILERAWEST%d(column1),
PERCENTILERAWEST%d(column3) FROM testTable", percentile,
+ percentile);
+
+ String query =
+ String
+ .format("SELECT PERCENTILEEST%d(column1), PERCENTILEEST%d(column3)
FROM testTable", percentile, percentile);
+
+ queryAndTestAggregationResult(rawQuery, getExpectedQueryResults(query),
quantileExtractor);
+
+ queryAndTestAggregationResult(rawQuery + getFilter(),
getExpectedQueryResults(query + getFilter()),
+ quantileExtractor);
+
+ queryAndTestAggregationResult(rawQuery + GROUP_BY,
getExpectedQueryResults(query + GROUP_BY), quantileExtractor);
+
+ queryAndTestAggregationResult(rawQuery + getFilter() + GROUP_BY,
+ getExpectedQueryResults(query + getFilter() + GROUP_BY),
+ quantileExtractor);
+ }
+
+ @Test
+ public void testPercentileRawTDigest50() {
+ testPercentileRawTDigestAggregationFunction(50);
+ }
+
+ @Test
+ public void testPercentileRawTDigest90() {
+ testPercentileRawTDigestAggregationFunction(90);
+ }
+
+ @Test
+ public void testPercentileRawTDigest95() {
+ testPercentileRawTDigestAggregationFunction(95);
+ }
+
+ @Test
+ public void testPercentileRawTDigest99() {
+ testPercentileRawTDigestAggregationFunction(99);
+ }
+
+ private void testPercentileRawTDigestAggregationFunction(int percentile) {
+ Function<Serializable, String> quantileExtractor = value -> String.valueOf(
+
ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(BytesUtils.toBytes((String) value))
+ .quantile(percentile / 100.0));
+
+ String rawQuery =
+ String.format("SELECT PERCENTILERAWTDIGEST%d(column1),
PERCENTILERAWTDIGEST%d(column3) FROM testTable",
+ percentile, percentile);
+
+ String query =
+ String.format("SELECT PERCENTILETDIGEST%d(column1),
PERCENTILETDIGEST%d(column3) FROM testTable", percentile,
+ percentile);
+
+ queryAndTestAggregationResultWithDelta(rawQuery,
getExpectedQueryResults(query), quantileExtractor);
+
+ queryAndTestAggregationResultWithDelta(rawQuery + getFilter(),
getExpectedQueryResults(query + getFilter()),
+ quantileExtractor);
+
+ queryAndTestAggregationResultWithDelta(rawQuery + GROUP_BY,
getExpectedQueryResults(query + GROUP_BY),
+ quantileExtractor);
+
+ queryAndTestAggregationResultWithDelta(rawQuery + getFilter() + GROUP_BY,
+ getExpectedQueryResults(query + getFilter() + GROUP_BY),
+ quantileExtractor);
+ }
+
+ private ExpectedQueryResult<String> getExpectedQueryResults(String query) {
+ return
QueriesTestUtils.buildExpectedResponse(getBrokerResponseForPqlQuery(query));
+ }
+
+ private void queryAndTestAggregationResultWithDelta(String query,
ExpectedQueryResult<String> expectedQueryResults,
+ Function<Serializable, String> responseMapper) {
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+
+ QueriesTestUtils
+ .testInterSegmentApproximateAggregationResult(brokerResponse,
expectedQueryResults.getNumDocsScanned(),
+ expectedQueryResults.getNumEntriesScannedInFilter(),
expectedQueryResults.getNumEntriesScannedPostFilter(),
+ expectedQueryResults.getNumTotalDocs(), responseMapper,
expectedQueryResults.getResults(),
+ PERCENTILE_TDIGEST_DELTA);
+ }
+
+ @Test
public void testNumGroupsLimit() {
String query = "SELECT COUNT(*) FROM testTable GROUP BY column1";
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java
b/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java
index 41a1129..2d1a5d1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java
@@ -19,6 +19,7 @@
package org.apache.pinot.queries;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -95,11 +96,9 @@ public class QueriesTestUtils {
public static void testInterSegmentAggregationResult(BrokerResponseNative
brokerResponse, long expectedNumDocsScanned,
long expectedNumEntriesScannedInFilter, long
expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs,
Function<Serializable, String> responseMapper, String[]
expectedAggregationResults) {
- Assert.assertEquals(brokerResponse.getNumDocsScanned(),
expectedNumDocsScanned);
- Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(),
expectedNumEntriesScannedInFilter);
- Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
expectedNumEntriesScannedPostFilter);
- Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs);
- List<AggregationResult> aggregationResults =
brokerResponse.getAggregationResults();
+ List<AggregationResult> aggregationResults =
+ validateAggregationStats(brokerResponse, expectedNumDocsScanned,
expectedNumEntriesScannedInFilter,
+ expectedNumEntriesScannedPostFilter, expectedNumTotalDocs);
int length = expectedAggregationResults.length;
Assert.assertEquals(aggregationResults.size(), length);
for (int i = 0; i < length; i++) {
@@ -117,9 +116,82 @@ public class QueriesTestUtils {
}
}
- public static void
testInterSegmentAggregationGroupByResult(BrokerResponseNative brokerResponse,
+ /**
+ * Builds expected response object from the given broker response.
+ * @param brokerResponse
+ * @return
+ */
+ public static ExpectedQueryResult<String>
buildExpectedResponse(BrokerResponseNative brokerResponse) {
+ Function<Serializable, String> responseMapper = Serializable::toString;
+
+ List<AggregationResult> aggregationResults =
brokerResponse.getAggregationResults();
+ List<String> results = new ArrayList<>();
+ for (int i = 0; i < aggregationResults.size(); i++) {
+ AggregationResult aggregationResult = aggregationResults.get(i);
+ Serializable value = aggregationResult.getValue();
+ if (value != null) {
+ results.add(responseMapper.apply(value));
+ // Aggregation.
+ } else {
+ // Group-by.
+
results.add(responseMapper.apply(aggregationResult.getGroupByResult().get(0).getValue()));
+ }
+ }
+ return new ExpectedQueryResult<>(brokerResponse.getNumDocsScanned(),
brokerResponse.getNumEntriesScannedInFilter(),
+ brokerResponse.getNumEntriesScannedPostFilter(),
brokerResponse.getTotalDocs(), results.toArray(new String[0]));
+ }
+
+ /**
+ * Verifies the given results of an approximate aggregation function.
+ * @param brokerResponse Broker response
+ * @param expectedNumDocsScanned Number of documents scanned.
+ * @param expectedNumEntriesScannedInFilter Number of entries scanned in
filter
+ * @param expectedNumEntriesScannedPostFilter Number of entries scanned post
filter.
+ * @param expectedNumTotalDocs Total documents.
+ * @param responseMapper Mapper to process response.
+ * @param expectedAggregationResults Expected aggregation results.
+ * @param resultComparisionDelta Validate results are within +/- delta range
(0 - 100)%.
+ */
+ public static void
testInterSegmentApproximateAggregationResult(BrokerResponseNative
brokerResponse,
long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter,
long expectedNumEntriesScannedPostFilter,
- long expectedNumTotalDocs, List<String[]> expectedGroupKeys,
List<String[]> expectedAggregationResults) {
+ long expectedNumTotalDocs, Function<Serializable, String>
responseMapper, String[] expectedAggregationResults,
+ double resultComparisionDelta) {
+ List<AggregationResult> aggregationResults =
+ validateAggregationStats(brokerResponse, expectedNumDocsScanned,
expectedNumEntriesScannedInFilter,
+ expectedNumEntriesScannedPostFilter, expectedNumTotalDocs);
+ int length = expectedAggregationResults.length;
+ Assert.assertEquals(aggregationResults.size(), length);
+
+ for (int i = 0; i < length; i++) {
+ AggregationResult aggregationResult = aggregationResults.get(i);
+ double expectedResult =
Double.parseDouble(expectedAggregationResults[i]);
+ Serializable value = aggregationResult.getValue();
+ double actualResult = 0L;
+ if (value != null) {
+ // Aggregation.
+ actualResult = Double.parseDouble(responseMapper.apply(value));
+ } else {
+ // Group-by.
+ actualResult =
Double.parseDouble(responseMapper.apply(aggregationResult.getGroupByResult().get(0).getValue()));
+ }
+ Assert.assertEquals(actualResult, expectedResult,
resultComparisionDelta);
+ }
+ }
+
+ private static List<AggregationResult>
validateAggregationStats(BrokerResponseNative brokerResponse,
+ long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter,
long expectedNumEntriesScannedPostFilter,
+ long expectedNumTotalDocs) {
+ Assert.assertEquals(brokerResponse.getNumDocsScanned(),
expectedNumDocsScanned);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(),
expectedNumEntriesScannedInFilter);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
expectedNumEntriesScannedPostFilter);
+ Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs);
+ return brokerResponse.getAggregationResults();
+ }
+
+ public static void
testInterSegmentAggregationGroupByResult(BrokerResponseNative brokerResponse,
+ long expectedNumDocsScanned,
+ long expectedNumEntriesScannedInFilter, long
expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs,
+ List<String[]> expectedGroupKeys, List<String[]>
expectedAggregationResults) {
testInterSegmentAggregationGroupByResult(brokerResponse,
expectedNumDocsScanned, expectedNumEntriesScannedInFilter,
expectedNumEntriesScannedPostFilter, expectedNumTotalDocs,
Serializable::toString, expectedGroupKeys,
expectedAggregationResults);
@@ -129,13 +201,9 @@ public class QueriesTestUtils {
long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter,
long expectedNumEntriesScannedPostFilter,
long expectedNumTotalDocs, Function<Serializable, String>
responseMapper, List<String[]> expectedGroupKeys,
List<String[]> expectedAggregationResults) {
- Assert.assertEquals(brokerResponse.getNumDocsScanned(),
expectedNumDocsScanned);
- Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(),
expectedNumEntriesScannedInFilter);
- Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
expectedNumEntriesScannedPostFilter);
- Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs);
- // size of this array will be equal to number of aggregation functions
since
- // we return each aggregation function separately
- List<AggregationResult> aggregationResults =
brokerResponse.getAggregationResults();
+ List<AggregationResult> aggregationResults =
+ validateAggregationStats(brokerResponse, expectedNumDocsScanned,
expectedNumEntriesScannedInFilter,
+ expectedNumEntriesScannedPostFilter, expectedNumTotalDocs);
int numAggregationColumns = aggregationResults.size();
Assert.assertEquals(numAggregationColumns,
expectedAggregationResults.get(0).length);
int numKeyColumns = expectedGroupKeys.get(0).length;
@@ -181,12 +249,9 @@ public class QueriesTestUtils {
static void testInterSegmentGroupByOrderByResultPQL(BrokerResponseNative
brokerResponse, long expectedNumDocsScanned,
long expectedNumEntriesScannedInFilter, long
expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs,
List<String[]> expectedGroups, List<List<Serializable>> expectedValues,
boolean preserveType) {
- Assert.assertEquals(brokerResponse.getNumDocsScanned(),
expectedNumDocsScanned);
- Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(),
expectedNumEntriesScannedInFilter);
- Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
expectedNumEntriesScannedPostFilter);
- Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs);
-
- List<AggregationResult> aggregationResults =
brokerResponse.getAggregationResults();
+ List<AggregationResult> aggregationResults =
+ validateAggregationStats(brokerResponse, expectedNumDocsScanned,
expectedNumEntriesScannedInFilter,
+ expectedNumEntriesScannedPostFilter, expectedNumTotalDocs);
if (aggregationResults == null) {
Assert.assertEquals(expectedGroups.size(), 0);
Assert.assertEquals(expectedValues.size(), 0);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index 86f85f4..092a285 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -58,8 +58,10 @@ public class ValueAggregatorFactory {
case DISTINCTCOUNTRAWHLL:
return new DistinctCountHLLValueAggregator();
case PERCENTILEEST:
+ case PERCENTILERAWEST:
return new PercentileEstValueAggregator();
case PERCENTILETDIGEST:
+ case PERCENTILERAWTDIGEST:
return new PercentileTDigestValueAggregator();
default:
throw new IllegalStateException("Unsupported aggregation type: " +
aggregationType);
@@ -94,8 +96,10 @@ public class ValueAggregatorFactory {
case DISTINCTCOUNTRAWHLL:
return DistinctCountHLLValueAggregator.AGGREGATED_VALUE_TYPE;
case PERCENTILEEST:
+ case PERCENTILERAWEST:
return PercentileEstValueAggregator.AGGREGATED_VALUE_TYPE;
case PERCENTILETDIGEST:
+ case PERCENTILERAWTDIGEST:
return PercentileTDigestValueAggregator.AGGREGATED_VALUE_TYPE;
default:
throw new IllegalStateException("Unsupported aggregation type: " +
aggregationType);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedQuantileDigest.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedQuantileDigest.java
new file mode 100644
index 0000000..edde4be
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedQuantileDigest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.spi.utils.BytesUtils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+
+/**
+ * Serialized and comparable version of QuantileDigest. Compares
QuantileDigest for a specific percentile value.
+ */
+public class SerializedQuantileDigest implements
Comparable<SerializedQuantileDigest> {
+ private final double _percentile;
+ private final QuantileDigest _quantileDigest;
+
+ public SerializedQuantileDigest(QuantileDigest quantileDigest, double
percentile) {
+ _quantileDigest = quantileDigest;
+ _percentile = percentile / 100.0;
+ }
+
+ @Override
+ public int compareTo(SerializedQuantileDigest other) {
+ checkArgument(other._percentile == _percentile, "Percentile number doesn't
match!");
+ return Long.compare(_quantileDigest.getQuantile(_percentile),
+ other._quantileDigest.getQuantile(_percentile));
+ }
+
+ @Override
+ public String toString() {
+ return
BytesUtils.toHexString(CustomSerDeUtils.QUANTILE_DIGEST_SER_DE.serialize(_quantileDigest));
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedTDigest.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedTDigest.java
new file mode 100644
index 0000000..24b2281
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedTDigest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import com.tdunning.math.stats.TDigest;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.spi.utils.BytesUtils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Serialized and comparable version of TDigest. Compares TDigest for a
specific percentile value.
+ */
+public class SerializedTDigest implements Comparable<SerializedTDigest> {
+ private final double _percentile;
+ private final TDigest _tDigest;
+
+ public SerializedTDigest(TDigest tDigest, double percentile) {
+ _tDigest = tDigest;
+ _percentile = percentile / 100.0;
+ }
+
+ @Override
+ public int compareTo(SerializedTDigest other) {
+ checkArgument(other._percentile == _percentile, "Percentile number doesn't
match!");
+ return Double.compare(_tDigest.quantile(_percentile),
other._tDigest.quantile(_percentile));
+ }
+
+ @Override
+ public String toString() {
+ return
BytesUtils.toHexString(CustomSerDeUtils.TDIGEST_SER_DE.serialize(_tDigest));
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 9d10e8a..197239b 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -44,7 +44,9 @@ public enum AggregationFunctionType {
DISTINCTCOUNTRAWTHETASKETCH("distinctCountRawThetaSketch"),
PERCENTILE("percentile"),
PERCENTILEEST("percentileEst"),
+ PERCENTILERAWEST("percentileRawEst"),
PERCENTILETDIGEST("percentileTDigest"),
+ PERCENTILERAWTDIGEST("percentileRawTDigest"),
IDSET("idSet"),
// Geo aggregation functions
@@ -63,7 +65,9 @@ public enum AggregationFunctionType {
DISTINCTCOUNTRAWHLLMV("distinctCountRawHLLMV"),
PERCENTILEMV("percentileMV"),
PERCENTILEESTMV("percentileEstMV"),
+ PERCENTILERAWESTMV("percentileRawEstMV"),
PERCENTILETDIGESTMV("percentileTDigestMV"),
+ PERCENTILERAWTDIGESTMV("percentileRawTDigestMV"),
DISTINCT("distinct");
private final String _name;
@@ -88,14 +92,22 @@ public enum AggregationFunctionType {
return PERCENTILE;
} else if (remainingFunctionName.equals("EST") ||
remainingFunctionName.matches("EST\\d+")) {
return PERCENTILEEST;
+ } else if (remainingFunctionName.equals("RAWEST") ||
remainingFunctionName.matches("RAWEST\\d+")) {
+ return PERCENTILERAWEST;
} else if (remainingFunctionName.equals("TDIGEST") ||
remainingFunctionName.matches("TDIGEST\\d+")) {
return PERCENTILETDIGEST;
+ } else if (remainingFunctionName.equals("RAWTDIGEST") ||
remainingFunctionName.matches("RAWTDIGEST\\d+")) {
+ return PERCENTILERAWTDIGEST;
} else if (remainingFunctionName.equals("MV") ||
remainingFunctionName.matches("\\d+MV")) {
return PERCENTILEMV;
} else if (remainingFunctionName.equals("ESTMV") ||
remainingFunctionName.matches("EST\\d+MV")) {
return PERCENTILEESTMV;
+ } else if (remainingFunctionName.equals("RAWESTMV") ||
remainingFunctionName.matches("RAWEST\\d+MV")) {
+ return PERCENTILERAWESTMV;
} else if (remainingFunctionName.equals("TDIGESTMV") ||
remainingFunctionName.matches("TDIGEST\\d+MV")) {
return PERCENTILETDIGESTMV;
+ } else if (remainingFunctionName.equals("RAWTDIGESTMV") ||
remainingFunctionName.matches("RAWTDIGEST\\d+MV")) {
+ return PERCENTILERAWTDIGESTMV;
} else {
throw new IllegalArgumentException("Invalid aggregation function name:
" + functionName);
}
diff --git
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java
index 659fc4c..b09499a 100644
---
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java
+++
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java
@@ -92,6 +92,15 @@ public class AggregationFunctionColumnPairTest {
Assert.assertEquals(fromColumnName, pair);
Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode());
+ pair = new
AggregationFunctionColumnPair(AggregationFunctionType.PERCENTILERAWEST, COLUMN);
+ Assert.assertEquals(pair.getFunctionType(),
AggregationFunctionType.PERCENTILERAWEST);
+ Assert.assertEquals(pair.getColumn(), COLUMN);
+ columnName = pair.toColumnName();
+ Assert.assertEquals(columnName, "percentileRawEst__column");
+ fromColumnName =
AggregationFunctionColumnPair.fromColumnName("PERCENTILE_RAW_EST__column");
+ Assert.assertEquals(fromColumnName, pair);
+ Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode());
+
pair = new
AggregationFunctionColumnPair(AggregationFunctionType.PERCENTILETDIGEST,
COLUMN);
Assert.assertEquals(pair.getFunctionType(),
AggregationFunctionType.PERCENTILETDIGEST);
Assert.assertEquals(pair.getColumn(), COLUMN);
@@ -100,5 +109,14 @@ public class AggregationFunctionColumnPairTest {
fromColumnName =
AggregationFunctionColumnPair.fromColumnName("percentiletdigest__column");
Assert.assertEquals(fromColumnName, pair);
Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode());
+
+ pair = new
AggregationFunctionColumnPair(AggregationFunctionType.PERCENTILERAWTDIGEST,
COLUMN);
+ Assert.assertEquals(pair.getFunctionType(),
AggregationFunctionType.PERCENTILERAWTDIGEST);
+ Assert.assertEquals(pair.getColumn(), COLUMN);
+ columnName = pair.toColumnName();
+ Assert.assertEquals(columnName, "percentileRawTDigest__column");
+ fromColumnName =
AggregationFunctionColumnPair.fromColumnName("percentilerawtdigest__column");
+ Assert.assertEquals(fromColumnName, pair);
+ Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]