This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 41fbb32846 Add PercentileKLL aggregation function (#10643) 41fbb32846 is described below commit 41fbb32846eb5ec6e656db6e57fa4aaa7177ff9f Author: Caner Balci <canerba...@gmail.com> AuthorDate: Tue May 16 15:55:30 2023 -0700 Add PercentileKLL aggregation function (#10643) * Add KllAggregationFunction for efficient percentile calculation * Fix test * Address review comments * Fix merge issue --- .../apache/pinot/core/common/ObjectSerDeUtils.java | 27 ++- .../function/AggregationFunctionFactory.java | 43 ++++ .../function/PercentileKLLAggregationFunction.java | 255 ++++++++++++++++++++ .../PercentileKLLMVAggregationFunction.java | 126 ++++++++++ .../PercentileRawKLLAggregationFunction.java | 58 +++++ .../PercentileRawKLLMVAggregationFunction.java | 58 +++++ .../function/AggregationFunctionFactoryTest.java | 6 + ...SegmentAggregationMultiValueRawQueriesTest.java | 41 ++++ ...erSegmentAggregationSingleValueQueriesTest.java | 111 +++++++++ .../pinot/queries/PercentileKLLMVQueriesTest.java | 94 ++++++++ .../pinot/queries/PercentileKLLQueriesTest.java | 256 +++++++++++++++++++++ .../segment/local/customobject/SerializedKLL.java | 49 ++++ .../pinot/segment/spi/AggregationFunctionType.java | 12 + 13 files changed, 1135 insertions(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index 9412014cef..14af9bac95 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -57,6 +57,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.datasketches.kll.KllDoublesSketch; import org.apache.datasketches.memory.Memory; import org.apache.datasketches.theta.Sketch; import org.apache.pinot.common.CustomObject; @@ -129,7 +130,8 @@ public class ObjectSerDeUtils { CovarianceTuple(32), VarianceTuple(33), PinotFourthMoment(34), - ArgMinMaxObject(35); + ArgMinMaxObject(35), + KllDataSketch(36); private final int _value; @@ -178,6 +180,8 @@ public class ObjectSerDeUtils { return ObjectType.DistinctTable; } else if (value instanceof Sketch) { return ObjectType.DataSketch; + } else if (value instanceof KllDoublesSketch) { + return ObjectType.KllDataSketch; } else if (value instanceof Geometry) { return ObjectType.Geometry; } else if (value instanceof RoaringBitmap) { @@ -922,6 +926,26 @@ public class ObjectSerDeUtils { } }; + public static final ObjectSerDe<KllDoublesSketch> KLL_SKETCH_SER_DE = new ObjectSerDe<KllDoublesSketch>() { + + @Override + public byte[] serialize(KllDoublesSketch value) { + return value.toByteArray(); + } + + @Override + public KllDoublesSketch deserialize(byte[] bytes) { + return KllDoublesSketch.wrap(Memory.wrap(bytes)); + } + + @Override + public KllDoublesSketch deserialize(ByteBuffer byteBuffer) { + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return KllDoublesSketch.wrap(Memory.wrap(bytes)); + } + }; + public static final ObjectSerDe<Geometry> GEOMETRY_SER_DE = new ObjectSerDe<Geometry>() { @Override @@ -1273,6 +1297,7 @@ public class ObjectSerDeUtils { VARIANCE_TUPLE_OBJECT_SER_DE, PINOT_FOURTH_MOMENT_OBJECT_SER_DE, ARG_MIN_MAX_OBJECT_SER_DE, + KLL_SKETCH_SER_DE, }; //@formatter:on diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index ba3fc837c4..418b864400 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -55,6 +55,17 @@ public class AggregationFunctionFactory { if (remainingFunctionName.equals("SMARTTDIGEST")) { return new PercentileSmartTDigestAggregationFunction(arguments); } + if (remainingFunctionName.contains("KLL")) { + if (remainingFunctionName.equals("KLL")) { + return new PercentileKLLAggregationFunction(arguments); + } else if (remainingFunctionName.equals("KLLMV")) { + return new PercentileKLLMVAggregationFunction(arguments); + } else if (remainingFunctionName.equals("RAWKLL")) { + return new PercentileRawKLLAggregationFunction(arguments); + } else if (remainingFunctionName.equals("RAWKLLMV")) { + return new PercentileRawKLLMVAggregationFunction(arguments); + } + } int numArguments = arguments.size(); if (numArguments == 1) { // Single argument percentile (e.g. Percentile99(foo), PercentileTDigest95(bar), etc.) @@ -77,6 +88,14 @@ public class AggregationFunctionFactory { // PercentileRawTDigest String percentileString = remainingFunctionName.substring(10); return new PercentileRawTDigestAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); + } else if (remainingFunctionName.matches("KLL\\d+")) { + // PercentileKLL + String percentileString = remainingFunctionName.substring(3); + return new PercentileKLLAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); + } else if (remainingFunctionName.matches("RAWKLL\\d+")) { + // PercentileRawKLL + String percentileString = remainingFunctionName.substring(6); + return new PercentileRawKLLAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); } else if (remainingFunctionName.matches("\\d+MV")) { // PercentileMV String percentileString = remainingFunctionName.substring(0, remainingFunctionName.length() - 2); @@ -97,6 +116,14 @@ public class AggregationFunctionFactory { // PercentileRawTDigestMV String percentileString = remainingFunctionName.substring(10, remainingFunctionName.length() - 2); return new PercentileRawTDigestMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); + } else if (remainingFunctionName.matches("KLL\\d+MV")) { + // PercentileKLLMV + String percentileString = remainingFunctionName.substring(3, remainingFunctionName.length() - 2); + return new PercentileKLLMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); + } else if (remainingFunctionName.matches("RAWKLL\\d+MV")) { + // PercentileRawKLLMV + String percentileString = remainingFunctionName.substring(6, remainingFunctionName.length() - 2); + return new PercentileRawKLLMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); } } else if (numArguments == 2) { // Double arguments percentile (e.g. percentile(foo, 99), percentileTDigest(bar, 95), etc.) where the @@ -123,6 +150,14 @@ public class AggregationFunctionFactory { // PercentileRawTDigest return new PercentileRawTDigestAggregationFunction(firstArgument, percentile); } + if (remainingFunctionName.equals("KLL")) { + // PercentileKLL + return new PercentileKLLAggregationFunction(firstArgument, percentile); + } + if (remainingFunctionName.equals("RAWKLL")) { + // PercentileRawKLL + return new PercentileRawKLLAggregationFunction(firstArgument, percentile); + } if (remainingFunctionName.equals("MV")) { // PercentileMV return new PercentileMVAggregationFunction(firstArgument, percentile); @@ -143,6 +178,14 @@ public class AggregationFunctionFactory { // PercentileRawTDigestMV return new PercentileRawTDigestMVAggregationFunction(firstArgument, percentile); } + if (remainingFunctionName.equals("KLLMV")) { + // PercentileKLLMV + return new PercentileKLLMVAggregationFunction(firstArgument, percentile); + } + if (remainingFunctionName.equals("RAWKLLMV")) { + // PercentileRawKLLMV + return new PercentileRawKLLMVAggregationFunction(firstArgument, percentile); + } } else if (numArguments == 3) { // Triple arguments percentile (e.g. percentileTDigest(bar, 95, 1000), etc.) where the // second argument is a decimal number from 0.0 to 100.0 and third argument is a decimal number indicating diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java new file mode 100644 index 0000000000..3584aafa41 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.datasketches.memory.Memory; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec.DataType; + +/** + * <p> + * {@code PercentileKLLAggregationFunction} provides an approximate percentile calculator using the KLL algorithm + * from <a href="https://datasketches.apache.org/docs/KLL/KLLSketch.html">Apache DataSketches library</a>. + * </p> + * <p> + * The interface is similar to plain 'Percentile' function except for the optional K value which determines + * the size, hence the accuracy of the sketch. + * </p> + * <p><b>PERCENTILE_KLL(col, percentile, kValue)</b></p> + * <p>E.g.:</p> + * <ul> + * <li><b>PERCENTILE_KLL(col, 90)</b></li> + * <li><b>PERCENTILE_KLL(col, 99.9, 800)</b></li> + * </ul> + * + * <p> + * If the column type is BYTES, the aggregation function will assume it is a serialized KllDoubleSketch and will + * attempt to deserialize it for further processing. + * </p> + * + * <p> + * There is a variation of the function (<b>PERCENTILE_RAW_KLL</b>) that returns the Base64 encoded + * sketch object to be used externally. + * </p> + */ +public class PercentileKLLAggregationFunction + extends BaseSingleInputAggregationFunction<KllDoublesSketch, Comparable> { + + protected final double _percentile; + protected int _kValue = 200; // size of the sketch. This is the default size used by DataSketches lib as well + + public PercentileKLLAggregationFunction(List<ExpressionContext> arguments) { + super(arguments.get(0)); + + // Check that there are correct number of arguments + int numArguments = arguments.size(); + Preconditions.checkArgument(numArguments == 2 || numArguments == 3, + "Expecting 2 or 3 arguments for PercentileKLL function: " + + "PERCENTILE_KLL(column, percentile, k=200"); + + _percentile = arguments.get(1).getLiteral().getDoubleValue(); + Preconditions.checkArgument(_percentile >= 0 && _percentile <= 100, + "Percentile value needs to be in range 0-100, inclusive"); + if (numArguments == 3) { + _kValue = arguments.get(2).getLiteral().getIntValue(); + } + } + + public PercentileKLLAggregationFunction(ExpressionContext expression, double percentile) { + super(expression); + _percentile = percentile; + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.PERCENTILEKLL; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + DataType valueType = valueSet.getValueType(); + KllDoublesSketch sketch = getOrCreateSketch(aggregationResultHolder); + + if (valueType == DataType.BYTES) { + // Assuming the column contains serialized data sketch + KllDoublesSketch[] deserializedSketches = + deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + for (int i = 0; i < length; i++) { + sketch.merge(deserializedSketches[i]); + } + } else { + double[] values = valueSet.getDoubleValuesSV(); + for (int i = 0; i < length; i++) { + sketch.update(values[i]); + } + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + DataType valueType = valueSet.getValueType(); + + if (valueType == DataType.BYTES) { + // serialized sketch + KllDoublesSketch[] deserializedSketches = + deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + for (int i = 0; i < length; i++) { + KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]); + sketch.merge(deserializedSketches[i]); + } + } else { + double[] values = valueSet.getDoubleValuesSV(); + for (int i = 0; i < length; i++) { + KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]); + sketch.update(values[i]); + } + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + DataType valueType = valueSet.getValueType(); + + if (valueType == DataType.BYTES) { + // serialized sketch + KllDoublesSketch[] deserializedSketches = + deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKey); + sketch.merge(deserializedSketches[i]); + } + } + } else { + double[] values = valueSet.getDoubleValuesSV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKey); + sketch.update(values[i]); + } + } + } + } + + /** + * Extracts the sketch from the result holder or creates a new one if it does not exist. + */ + protected KllDoublesSketch getOrCreateSketch(AggregationResultHolder aggregationResultHolder) { + KllDoublesSketch sketch = aggregationResultHolder.getResult(); + if (sketch == null) { + sketch = KllDoublesSketch.newHeapInstance(_kValue); + aggregationResultHolder.setValue(sketch); + } + return sketch; + } + + /** + * Extracts the sketch from the group by result holder for key + * or creates a new one if it does not exist. + */ + protected KllDoublesSketch getOrCreateSketch(GroupByResultHolder groupByResultHolder, int groupKey) { + KllDoublesSketch sketch = groupByResultHolder.getResult(groupKey); + if (sketch == null) { + sketch = KllDoublesSketch.newHeapInstance(_kValue); + groupByResultHolder.setValueForKey(groupKey, sketch); + } + return sketch; + } + + /** + * Deserializes the sketches from the bytes. + */ + protected KllDoublesSketch[] deserializeSketches(byte[][] serializedSketches) { + KllDoublesSketch[] sketches = new KllDoublesSketch[serializedSketches.length]; + for (int i = 0; i < serializedSketches.length; i++) { + sketches[i] = KllDoublesSketch.wrap(Memory.wrap(serializedSketches[i])); + } + return sketches; + } + + @Override + public KllDoublesSketch extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + return aggregationResultHolder.getResult(); + } + + @Override + public KllDoublesSketch extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + return groupByResultHolder.getResult(groupKey); + } + + @Override + public KllDoublesSketch merge(KllDoublesSketch sketch1, KllDoublesSketch sketch2) { + KllDoublesSketch union = KllDoublesSketch.newHeapInstance(_kValue); + if (sketch1 != null) { + union.merge(sketch1); + } + if (sketch2 != null) { + union.merge(sketch2); + } + return union; + } + + @Override + public ColumnDataType getIntermediateResultColumnType() { + return ColumnDataType.OBJECT; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.DOUBLE; + } + + @Override + public String getResultColumnName() { + return AggregationFunctionType.PERCENTILEKLL.getName().toLowerCase() + + "(" + _expression + ", " + _percentile + ")"; + } + + @Override + public Comparable extractFinalResult(KllDoublesSketch sketch) { + return sketch.getQuantile(_percentile / 100); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLMVAggregationFunction.java new file mode 100644 index 0000000000..61d1bd0bce --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLMVAggregationFunction.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.List; +import java.util.Map; +import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +public class PercentileKLLMVAggregationFunction extends PercentileKLLAggregationFunction { + public PercentileKLLMVAggregationFunction(ExpressionContext expression, double percentile) { + super(expression, percentile); + } + + public PercentileKLLMVAggregationFunction(List<ExpressionContext> arguments) { + super(arguments); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + DataType valueType = valueSet.getValueType(); + KllDoublesSketch sketch = getOrCreateSketch(aggregationResultHolder); + + if (valueType == DataType.BYTES) { + // Assuming the column contains serialized data sketches + KllDoublesSketch[] deserializedSketches = deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + for (int i = 0; i < length; i++) { + sketch.merge(deserializedSketches[i]); + } + } else { + double[][] values = valueSet.getDoubleValuesMV(); + for (int i = 0; i < length; i++) { + for (double val : values[i]) { + sketch.update(val); + } + } + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + DataType valueType = valueSet.getValueType(); + + if (valueType == DataType.BYTES) { + // serialized sketch + KllDoublesSketch[] deserializedSketches = deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + for (int i = 0; i < length; i++) { + KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]); + sketch.merge(deserializedSketches[i]); + } + } else { + double[][] values = valueSet.getDoubleValuesMV(); + for (int i = 0; i < length; i++) { + KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]); + for (double val : values[i]) { + sketch.update(val); + } + } + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + DataType valueType = valueSet.getValueType(); + + if (valueType == DataType.BYTES) { + // serialized sketch + KllDoublesSketch[] deserializedSketches = deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + KllDoublesSketch sketch = this.getOrCreateSketch(groupByResultHolder, groupKey); + sketch.merge(deserializedSketches[i]); + } + } + } else { + double[][] values = valueSet.getDoubleValuesMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKey); + for (double val : values[i]) { + sketch.update(val); + } + } + } + } + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.PERCENTILEKLLMV; + } + + @Override + public String getResultColumnName() { + return AggregationFunctionType.PERCENTILEKLLMV.getName().toLowerCase() + + "(" + _expression + ", " + _percentile + ")"; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLAggregationFunction.java new file mode 100644 index 0000000000..48bd421ee3 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLAggregationFunction.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.List; +import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.segment.local.customobject.SerializedKLL; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class PercentileRawKLLAggregationFunction extends PercentileKLLAggregationFunction { + public PercentileRawKLLAggregationFunction(ExpressionContext expression, double percentile) { + super(expression, percentile); + } + + public PercentileRawKLLAggregationFunction(List<ExpressionContext> arguments) { + super(arguments); + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.PERCENTILERAWKLL; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.STRING; + } + + @Override + public String getResultColumnName() { + return AggregationFunctionType.PERCENTILERAWKLL.getName().toLowerCase() + + "(" + _expression + ", " + _percentile + ")"; + } + + @Override + public SerializedKLL extractFinalResult(KllDoublesSketch sketch) { + return new SerializedKLL(sketch, _percentile); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLMVAggregationFunction.java new file mode 100644 index 0000000000..3c0885e970 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLMVAggregationFunction.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.List; +import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.segment.local.customobject.SerializedKLL; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class PercentileRawKLLMVAggregationFunction extends PercentileKLLMVAggregationFunction { + public PercentileRawKLLMVAggregationFunction(ExpressionContext expression, double percentile) { + super(expression, percentile); + } + + public PercentileRawKLLMVAggregationFunction(List<ExpressionContext> arguments) { + super(arguments); + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.PERCENTILERAWKLLMV; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.STRING; + } + + @Override + public String getResultColumnName() { + return AggregationFunctionType.PERCENTILERAWKLLMV.getName().toLowerCase() + + "(" + _expression + ", " + _percentile + ")"; + } + + @Override + public SerializedKLL extractFinalResult(KllDoublesSketch sketch) { + return new SerializedKLL(sketch, _percentile); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java index a0e10869e0..a4eaa1f991 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java @@ -281,6 +281,12 @@ public class AggregationFunctionFactoryTest { assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILERAWTDIGEST); assertEquals(aggregationFunction.getResultColumnName(), "percentilerawtdigest(column, 99.9999)"); + function = getFunction("PeRcEntiLEkll", "(column, 99.9999)"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof PercentileKLLAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILEKLL); + assertEquals(aggregationFunction.getResultColumnName(), "percentilekll(column, 99.9999)"); + function = getFunction("PeRcEnTiLeRaWtDiGeSt", "(column, 99.9999, 500)"); aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); assertTrue(aggregationFunction instanceof PercentileRawTDigestAggregationFunction); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java index fdd9a0ed65..65480aa95f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java @@ -18,10 +18,13 @@ */ package org.apache.pinot.queries; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.function.Function; +import org.apache.calcite.avatica.util.Base64; +import org.apache.datasketches.kll.KllDoublesSketch; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; @@ -41,6 +44,8 @@ public class InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa // Allow 5% quantile error due to the randomness of TDigest merge private static final double PERCENTILE_TDIGEST_DELTA = 0.05 * Integer.MAX_VALUE; + // Allow 2% quantile error due to the randomness of KLL merge + private static final double PERCENTILE_KLL_DELTA = 0.02 * Integer.MAX_VALUE; @Test public void testCountMV() { @@ -540,6 +545,42 @@ public class InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa getBrokerResponse(regularQuery + FILTER + MV_GROUP_BY), quantileExtractor, PERCENTILE_TDIGEST_DELTA); } + @Test + public void testPercentileRawKLLMV() { + testPercentileRawKLLMV(50); + testPercentileRawKLLMV(90); + testPercentileRawKLLMV(95); + testPercentileRawKLLMV(99); + } + + private void testPercentileRawKLLMV(int percentile) { + Function<Object, Object> quantileExtractor = + value -> { + try { + KllDoublesSketch sketch = + (KllDoublesSketch) ObjectSerDeUtils.KLL_SKETCH_SER_DE.deserialize(Base64.decode((String) value)); + return sketch.getQuantile(percentile / 100.0); + } catch (IOException e) { + return null; + } + }; + + String rawKllQuery = String.format("SELECT PERCENTILERAWKLL%dMV(column6) AS value FROM testTable", percentile); + String regularQuery = String.format("SELECT PERCENTILE%dMV(column6) AS value FROM testTable", percentile); + QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery), getBrokerResponse(regularQuery), + quantileExtractor, PERCENTILE_KLL_DELTA); + QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + FILTER), + getBrokerResponse(regularQuery + FILTER), quantileExtractor, PERCENTILE_KLL_DELTA); + QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + SV_GROUP_BY), + getBrokerResponse(regularQuery + SV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA); + QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + FILTER + SV_GROUP_BY), + getBrokerResponse(regularQuery + FILTER + SV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA); + QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + MV_GROUP_BY), + getBrokerResponse(regularQuery + MV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA); + QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + FILTER + MV_GROUP_BY), + getBrokerResponse(regularQuery + FILTER + MV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA); + } + @Test public void testNumGroupsLimit() { String query = "SELECT COUNT(*) FROM testTable GROUP BY column6"; diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java index 5fcad1307d..a6e8320962 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java @@ -40,6 +40,8 @@ public class InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal // Allow 5% quantile error due to the randomness of TDigest merge private static final double PERCENTILE_TDIGEST_DELTA = 0.05 * Integer.MAX_VALUE; + // Allow 2% quantile error due to the randomness of KLL merge + private static final double PERCENTILE_KLL_DELTA = 0.02 * Integer.MAX_VALUE; @Test public void testCount() { @@ -589,6 +591,115 @@ public class InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal getBrokerResponse(regularQuery + FILTER + GROUP_BY), quantileExtractor, PERCENTILE_TDIGEST_DELTA); } + @Test + public void testPercentileKLL() { + String query = "SELECT PERCENTILEKLL(column1, 50) AS v1, PERCENTILEKLL(column3, 50) AS v2 FROM testTable"; + + BrokerResponseNative brokerResponse = getBrokerResponse(query); + DataSchema expectedDataSchema = + new DataSchema(new String[]{"v1", "v2"}, new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}); + ResultTable expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1107310944L, 1082130431L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, + 240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1139674505L, 509607935L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, + 49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + GROUP_BY); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, + 360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2142595699L, 334963174L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, + 73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + query = "SELECT PERCENTILEKLL(column1, 90) AS v1, PERCENTILEKLL(column3, 90) AS v2 FROM testTable"; + + brokerResponse = getBrokerResponse(query); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1946157055L, 1946157055L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, + 240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1939865599L, 902299647L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, + 49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + GROUP_BY); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, + 360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2142595699L, 334963174L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, + 73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + query = "SELECT PERCENTILEKLL(column1, 95) AS v1, PERCENTILEKLL(column3, 95) AS v2 FROM testTable"; + + brokerResponse = getBrokerResponse(query); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2080374783L, 2051014655L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, + 240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2109734911L, 950009855L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, + 49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + GROUP_BY); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, + 360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2142595699L, 334963174L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, + 73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + query = "SELECT PERCENTILEKLL(column1, 99) AS v1, PERCENTILEKLL(column3, 99) AS v2 FROM testTable"; + + brokerResponse = getBrokerResponse(query); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2143289343L, 2143289343L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, + 240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146232405L, 991952895L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, + 49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + GROUP_BY); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, + 360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + + brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY); + expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146232405L, 993001471L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, + 73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA); + } + @Test public void testNumGroupsLimit() { String query = "SELECT COUNT(*) FROM testTable GROUP BY column1"; diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLMVQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLMVQueriesTest.java new file mode 100644 index 0000000000..40a8b9046d --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLMVQueriesTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; + + +/** + * Variation of the PercentileKLLQueriesTest suite which tests PERCENTILE_KLL_MV + */ +public class PercentileKLLMVQueriesTest extends PercentileKLLQueriesTest { + private static final int MAX_NUM_MULTI_VALUES = 10; + + @Override + protected void buildSegment() + throws Exception { + List<GenericRow> rows = new ArrayList<>(NUM_ROWS); + for (int i = 0; i < NUM_ROWS; i++) { + GenericRow row = new GenericRow(); + + int numMultiValues = RANDOM.nextInt(MAX_NUM_MULTI_VALUES) + 1; + Double[] values = new Double[numMultiValues]; + KllDoublesSketch sketch = KllDoublesSketch.newHeapInstance(); + for (int j = 0; j < numMultiValues; j++) { + double value = RANDOM.nextDouble() * VALUE_RANGE; + values[j] = value; + sketch.update(value); + } + row.putValue(DOUBLE_COLUMN, values); + row.putValue(KLL_COLUMN, sketch.toByteArray()); + + String group = GROUPS[RANDOM.nextInt(GROUPS.length)]; + row.putValue(GROUP_BY_COLUMN, group); + + rows.add(row); + } + + Schema schema = new Schema(); + schema.addField(new DimensionFieldSpec(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE, false)); + schema.addField(new MetricFieldSpec(KLL_COLUMN, FieldSpec.DataType.BYTES)); + schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN, FieldSpec.DataType.STRING, true)); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setOutDir(INDEX_DIR.getPath()); + config.setTableName(TABLE_NAME); + config.setSegmentName(SEGMENT_NAME); + config.setRawIndexCreationColumns(Collections.singletonList(KLL_COLUMN)); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + try (RecordReader recordReader = new GenericRowRecordReader(rows)) { + driver.init(config, recordReader); + driver.build(); + } + } + + @Override + protected String getAggregationQuery(int percentile) { + return String.format("SELECT PERCENTILE%1$dMV(%2$s), PERCENTILEKLL%1$dMV(%2$s), PERCENTILEKLL%1$d(%3$s), " + + "PERCENTILEMV(%2$s, %1$d), PERCENTILEKLLMV(%2$s, %1$d), PERCENTILEKLL(%3$s, %1$d) FROM %4$s", + percentile, DOUBLE_COLUMN, KLL_COLUMN, TABLE_NAME); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLQueriesTest.java new file mode 100644 index 0000000000..05e626e4b4 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLQueriesTest.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import it.unimi.dsi.fastutil.doubles.DoubleList; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.operator.query.AggregationOperator; +import org.apache.pinot.core.operator.query.GroupByOperator; +import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; +import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + + +/** + * Tests for PERCENTILE_KLL aggregation function. + * + * <ul> + * <li>Generates a segment with a double column, a KLL column and a group-by column</li> + * <li>Runs aggregation and group-by queries on the generated segment</li> + * <li> + * Compares the results for PERCENTILE_KLL on double column and KLL column with results for PERCENTILE on + * double column + * </li> + * </ul> + */ +public class PercentileKLLQueriesTest extends BaseQueriesTest { + protected static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PercentileKllQueriesTest"); + protected static final String TABLE_NAME = "testTable"; + protected static final String SEGMENT_NAME = "testSegment"; + + protected static final int NUM_ROWS = 1000; + protected static final int VALUE_RANGE = Integer.MAX_VALUE; + protected static final double DELTA = 0.05 * VALUE_RANGE; // Allow 5% delta + protected static final String DOUBLE_COLUMN = "doubleColumn"; + protected static final String KLL_COLUMN = "kllColumn"; + protected static final String GROUP_BY_COLUMN = "groupByColumn"; + protected static final String[] GROUPS = new String[]{"G1", "G2", "G3"}; + protected static final long RANDOM_SEED = System.nanoTime(); + protected static final Random RANDOM = new Random(RANDOM_SEED); + protected static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED; + + private IndexSegment _indexSegment; + private List<IndexSegment> _indexSegments; + + @Override + protected String getFilter() { + return ""; // No filtering required for this test. + } + + @Override + protected IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + protected List<IndexSegment> getIndexSegments() { + return _indexSegments; + } + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteQuietly(INDEX_DIR); + + buildSegment(); + ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); + } + + protected void buildSegment() + throws Exception { + List<GenericRow> rows = new ArrayList<>(NUM_ROWS); + for (int i = 0; i < NUM_ROWS; i++) { + GenericRow row = new GenericRow(); + + double value = RANDOM.nextDouble() * VALUE_RANGE; + row.putValue(DOUBLE_COLUMN, value); + + KllDoublesSketch sketch = KllDoublesSketch.newHeapInstance(); + sketch.update(value); + row.putValue(KLL_COLUMN, sketch.toByteArray()); + + String group = GROUPS[RANDOM.nextInt(GROUPS.length)]; + row.putValue(GROUP_BY_COLUMN, group); + + rows.add(row); + } + + Schema schema = new Schema(); + schema.addField(new MetricFieldSpec(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE)); + schema.addField(new MetricFieldSpec(KLL_COLUMN, FieldSpec.DataType.BYTES)); + schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN, FieldSpec.DataType.STRING, true)); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setOutDir(INDEX_DIR.getPath()); + config.setTableName(TABLE_NAME); + config.setSegmentName(SEGMENT_NAME); + config.setRawIndexCreationColumns(Collections.singletonList(KLL_COLUMN)); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + try (RecordReader recordReader = new GenericRowRecordReader(rows)) { + driver.init(config, recordReader); + driver.build(); + } + } + + @Test + public void testInnerSegmentAggregation() { + // For inner segment case, percentile does not affect the intermediate result + AggregationOperator aggregationOperator = getOperator(getAggregationQuery(0)); + AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock(); + List<Object> aggregationResult = resultsBlock.getResults(); + assertNotNull(aggregationResult); + assertEquals(aggregationResult.size(), 6); + DoubleList doubleList0 = (DoubleList) aggregationResult.get(0); + Collections.sort(doubleList0); + assertSketch((KllDoublesSketch) aggregationResult.get(1), doubleList0); + assertSketch((KllDoublesSketch) aggregationResult.get(2), doubleList0); + + DoubleList doubleList3 = (DoubleList) aggregationResult.get(3); + Collections.sort(doubleList3); + assertEquals(doubleList3, doubleList0); + assertSketch((KllDoublesSketch) aggregationResult.get(4), doubleList0); + assertSketch((KllDoublesSketch) aggregationResult.get(5), doubleList0); + } + + @Test + public void testInterSegmentAggregation() { + for (int percentile = 0; percentile <= 100; percentile++) { + BrokerResponseNative brokerResponse = getBrokerResponse(getAggregationQuery(percentile)); + Object[] results = brokerResponse.getResultTable().getRows().get(0); + assertEquals(results.length, 6); + double expectedResult = (Double) results[0]; + for (int i = 1; i < 6; i++) { + assertEquals((Double) results[i], expectedResult, DELTA, ERROR_MESSAGE); + } + } + } + + @Test + public void testInnerSegmentGroupBy() { + // For inner segment case, percentile does not affect the intermediate result + GroupByOperator groupByOperator = getOperator(getGroupByQuery(0)); + GroupByResultsBlock resultsBlock = groupByOperator.nextBlock(); + AggregationGroupByResult groupByResult = resultsBlock.getAggregationGroupByResult(); + assertNotNull(groupByResult); + Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = groupByResult.getGroupKeyIterator(); + while (groupKeyIterator.hasNext()) { + int groupId = groupKeyIterator.next()._groupId; + DoubleList doubleList0 = (DoubleList) groupByResult.getResultForGroupId(0, groupId); + Collections.sort(doubleList0); + assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(1, groupId), doubleList0); + assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(2, groupId), doubleList0); + + DoubleList doubleList3 = (DoubleList) groupByResult.getResultForGroupId(3, groupId); + Collections.sort(doubleList3); + assertEquals(doubleList3, doubleList0); + assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(4, groupId), doubleList0); + assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(5, groupId), doubleList0); + } + } + + @Test + public void testInterSegmentGroupBy() { + for (int percentile = 0; percentile <= 100; percentile++) { + BrokerResponseNative brokerResponse = getBrokerResponse(getGroupByQuery(percentile)); + List<Object[]> rows = brokerResponse.getResultTable().getRows(); + assertEquals(rows.size(), 3); + for (Object[] row : rows) { + assertEquals(row.length, 6); + double expectedResult = (Double) row[0]; + for (int i = 1; i < 6; i++) { + assertEquals((Double) row[i], expectedResult, DELTA, ERROR_MESSAGE); + } + } + } + } + + protected String getAggregationQuery(int percentile) { + return String.format("SELECT PERCENTILE%1$d(%2$s), PERCENTILEKLL%1$d(%2$s), PERCENTILEKLL%1$d(%3$s), " + + "PERCENTILE(%2$s, %1$d), PERCENTILEKLL(%2$s, %1$d), PERCENTILEKLL(%3$s, %1$d) FROM %4$s", + percentile, DOUBLE_COLUMN, KLL_COLUMN, TABLE_NAME); + } + + private String getGroupByQuery(int percentile) { + return String.format("%s GROUP BY %s", getAggregationQuery(percentile), GROUP_BY_COLUMN); + } + + private void assertSketch(KllDoublesSketch sketch, DoubleList doubleList) { + for (int percentile = 0; percentile <= 100; percentile++) { + double expected; + if (percentile == 100) { + expected = doubleList.getDouble(doubleList.size() - 1); + } else { + expected = doubleList.getDouble(doubleList.size() * percentile / 100); + } + assertEquals(sketch.getQuantile(percentile / 100.0), expected, DELTA, ERROR_MESSAGE); + } + } + + @AfterClass + public void tearDown() { + _indexSegment.destroy(); + FileUtils.deleteQuietly(INDEX_DIR); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedKLL.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedKLL.java new file mode 100644 index 0000000000..f31f7a198d --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedKLL.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import java.util.Base64; +import org.apache.datasketches.kll.KllDoublesSketch; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Serialized and comparable version of KllDoublesSketch. + * Compares two sketches for a specific percentile value. + */ +public class SerializedKLL implements Comparable<SerializedKLL> { + private final double _quantile; + private final KllDoublesSketch _sketch; + + public SerializedKLL(KllDoublesSketch sketch, double percentile) { + _sketch = sketch; + _quantile = percentile / 100.0; + } + + @Override + public int compareTo(SerializedKLL other) { + checkArgument(other._quantile == _quantile, "Quantile numbers don't match"); + return Double.compare(_sketch.getQuantile(_quantile), other._sketch.getQuantile(_quantile)); + } + + @Override + public String toString() { + return Base64.getEncoder().encodeToString(_sketch.toByteArray()); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 0a2a22dd00..a9d2085c8f 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -58,6 +58,8 @@ public enum AggregationFunctionType { PERCENTILETDIGEST("percentileTDigest"), PERCENTILERAWTDIGEST("percentileRawTDigest"), PERCENTILESMARTTDIGEST("percentileSmartTDigest"), + PERCENTILEKLL("percentileKLL"), + PERCENTILERAWKLL("percentileRawKLL"), IDSET("idSet"), HISTOGRAM("histogram"), COVARPOP("covarPop"), @@ -91,6 +93,8 @@ public enum AggregationFunctionType { PERCENTILERAWESTMV("percentileRawEstMV"), PERCENTILETDIGESTMV("percentileTDigestMV"), PERCENTILERAWTDIGESTMV("percentileRawTDigestMV"), + PERCENTILEKLLMV("percentileKLLMV"), + PERCENTILERAWKLLMV("percentileRawKLLMV"), DISTINCT("distinct"), // boolean aggregate functions @@ -151,6 +155,10 @@ public enum AggregationFunctionType { return PERCENTILETDIGEST; } else if (remainingFunctionName.equals("RAWTDIGEST") || remainingFunctionName.matches("RAWTDIGEST\\d+")) { return PERCENTILERAWTDIGEST; + } else if (remainingFunctionName.equals("KLL") || remainingFunctionName.matches("KLL\\d+")) { + return PERCENTILEKLL; + } else if (remainingFunctionName.equals("RAWKLL") || remainingFunctionName.matches("RAWKLL\\d+")) { + return PERCENTILERAWKLL; } else if (remainingFunctionName.equals("MV") || remainingFunctionName.matches("\\d+MV")) { return PERCENTILEMV; } else if (remainingFunctionName.equals("ESTMV") || remainingFunctionName.matches("EST\\d+MV")) { @@ -161,6 +169,10 @@ public enum AggregationFunctionType { return PERCENTILETDIGESTMV; } else if (remainingFunctionName.equals("RAWTDIGESTMV") || remainingFunctionName.matches("RAWTDIGEST\\d+MV")) { return PERCENTILERAWTDIGESTMV; + } else if (remainingFunctionName.equals("KLLMV") || remainingFunctionName.matches("KLL\\d+MV")) { + return PERCENTILEKLLMV; + } else if (remainingFunctionName.equals("RAWKLLMV") || remainingFunctionName.matches("RAWKLL\\d+MV")) { + return PERCENTILEKLLMV; } else { throw new IllegalArgumentException("Invalid aggregation function name: " + functionName); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org