This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new da3bce5 Add LASTWITHTIME aggregate function support #7315 (#7584) da3bce5 is described below commit da3bce50eb79656d967da7636a741653735abd63 Author: weixiangsun <91153405+weixiang...@users.noreply.github.com> AuthorDate: Mon Oct 25 22:13:59 2021 -0700 Add LASTWITHTIME aggregate function support #7315 (#7584) Adding aggregate function to return last value of time-based data set. --- .../function/AggregationFunctionTypeTest.java | 2 + .../apache/pinot/core/common/ObjectSerDeUtils.java | 122 ++++- .../function/AggregationFunctionFactory.java | 31 ++ ...LastDoubleValueWithTimeAggregationFunction.java | 126 +++++ .../LastFloatValueWithTimeAggregationFunction.java | 127 +++++ .../LastIntValueWithTimeAggregationFunction.java | 142 ++++++ .../LastLongValueWithTimeAggregationFunction.java | 126 +++++ ...LastStringValueWithTimeAggregationFunction.java | 124 +++++ .../function/LastWithTimeAggregationFunction.java | 227 +++++++++ .../BrokerRequestToQueryContextConverter.java | 7 + .../pinot/core/common/ObjectSerDeUtilsTest.java | 76 +++ .../function/AggregationFunctionFactoryTest.java | 42 ++ .../pinot/queries/LastWithTimeQueriesTest.java | 548 +++++++++++++++++++++ .../segment/local/customobject/DoubleLongPair.java | 45 ++ .../segment/local/customobject/FloatLongPair.java | 45 ++ .../segment/local/customobject/IntLongPair.java | 45 ++ .../segment/local/customobject/LongLongPair.java | 45 ++ .../segment/local/customobject/StringLongPair.java | 50 ++ .../segment/local/customobject/ValueLongPair.java | 50 ++ .../pinot/segment/spi/AggregationFunctionType.java | 1 + 20 files changed, 1979 insertions(+), 2 deletions(-) diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java index 042381b..ea9f69a 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java @@ -33,6 +33,8 @@ public class AggregationFunctionTypeTest { Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("SuM"), AggregationFunctionType.SUM); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("AvG"), AggregationFunctionType.AVG); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MoDe"), AggregationFunctionType.MODE); + Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("LaStWiThTiMe"), + AggregationFunctionType.LASTWITHTIME); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MiNmAxRaNgE"), AggregationFunctionType.MINMAXRANGE); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnT"), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index 3341a7c..e6ed491 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -62,8 +62,13 @@ import org.apache.pinot.core.query.distinct.DistinctTable; import org.apache.pinot.core.query.utils.idset.IdSet; import org.apache.pinot.core.query.utils.idset.IdSets; import org.apache.pinot.segment.local.customobject.AvgPair; +import org.apache.pinot.segment.local.customobject.DoubleLongPair; +import org.apache.pinot.segment.local.customobject.FloatLongPair; +import org.apache.pinot.segment.local.customobject.IntLongPair; +import org.apache.pinot.segment.local.customobject.LongLongPair; import org.apache.pinot.segment.local.customobject.MinMaxRangePair; import org.apache.pinot.segment.local.customobject.QuantileDigest; +import org.apache.pinot.segment.local.customobject.StringLongPair; import org.apache.pinot.segment.local.utils.GeometrySerializer; import org.apache.pinot.spi.utils.BigDecimalUtils; import org.apache.pinot.spi.utils.ByteArray; @@ -109,7 +114,12 @@ public class ObjectSerDeUtils { Int2LongMap(23), Long2LongMap(24), Float2LongMap(25), - Double2LongMap(26); + Double2LongMap(26), + IntLongPair(27), + LongLongPair(28), + FloatLongPair(29), + DoubleLongPair(30), + StringLongPair(31); private final int _value; ObjectType(int value) { @@ -178,6 +188,16 @@ public class ObjectSerDeUtils { return ObjectType.IdSet; } else if (value instanceof List) { return ObjectType.List; + } else if (value instanceof IntLongPair) { + return ObjectType.IntLongPair; + } else if (value instanceof LongLongPair) { + return ObjectType.LongLongPair; + } else if (value instanceof FloatLongPair) { + return ObjectType.FloatLongPair; + } else if (value instanceof DoubleLongPair) { + return ObjectType.DoubleLongPair; + } else if (value instanceof StringLongPair) { + return ObjectType.StringLongPair; } else { throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName()); } @@ -330,6 +350,99 @@ public class ObjectSerDeUtils { } }; + public static final ObjectSerDe<IntLongPair> INT_LONG_PAIR_SER_DE + = new ObjectSerDe<IntLongPair>() { + + @Override + public byte[] serialize(IntLongPair intLongPair) { + return intLongPair.toBytes(); + } + + @Override + public IntLongPair deserialize(byte[] bytes) { + return IntLongPair.fromBytes(bytes); + } + + @Override + public IntLongPair deserialize(ByteBuffer byteBuffer) { + return IntLongPair.fromByteBuffer(byteBuffer); + } + }; + + public static final ObjectSerDe<LongLongPair> LONG_LONG_PAIR_SER_DE + = new ObjectSerDe<LongLongPair>() { + + @Override + public byte[] serialize(LongLongPair longLongPair) { + return longLongPair.toBytes(); + } + + @Override + public LongLongPair deserialize(byte[] bytes) { + return LongLongPair.fromBytes(bytes); + } + + @Override + public LongLongPair deserialize(ByteBuffer byteBuffer) { + return LongLongPair.fromByteBuffer(byteBuffer); + } + }; + + public static final ObjectSerDe<FloatLongPair> FLOAT_LONG_PAIR_SER_DE + = new ObjectSerDe<FloatLongPair>() { + + @Override + public byte[] serialize(FloatLongPair floatLongPair) { + return floatLongPair.toBytes(); + } + + @Override + public FloatLongPair deserialize(byte[] bytes) { + return FloatLongPair.fromBytes(bytes); + } + + @Override + public FloatLongPair deserialize(ByteBuffer byteBuffer) { + return FloatLongPair.fromByteBuffer(byteBuffer); + } + }; + public static final ObjectSerDe<DoubleLongPair> DOUBLE_LONG_PAIR_SER_DE + = new ObjectSerDe<DoubleLongPair>() { + + @Override + public byte[] serialize(DoubleLongPair doubleLongPair) { + return doubleLongPair.toBytes(); + } + + @Override + public DoubleLongPair deserialize(byte[] bytes) { + return DoubleLongPair.fromBytes(bytes); + } + + @Override + public DoubleLongPair deserialize(ByteBuffer byteBuffer) { + return DoubleLongPair.fromByteBuffer(byteBuffer); + } + }; + public static final ObjectSerDe<StringLongPair> STRING_LONG_PAIR_SER_DE + = new ObjectSerDe<StringLongPair>() { + + @Override + public byte[] serialize(StringLongPair stringLongPair) { + return stringLongPair.toBytes(); + } + + @Override + public StringLongPair deserialize(byte[] bytes) { + return StringLongPair.fromBytes(bytes); + } + + @Override + public StringLongPair deserialize(ByteBuffer byteBuffer) { + return StringLongPair.fromByteBuffer(byteBuffer); + } + }; + public static final ObjectSerDe<HyperLogLog> HYPER_LOG_LOG_SER_DE = new ObjectSerDe<HyperLogLog>() { @Override @@ -1047,7 +1160,12 @@ public class ObjectSerDeUtils { INT_2_LONG_MAP_SER_DE, LONG_2_LONG_MAP_SER_DE, FLOAT_2_LONG_MAP_SER_DE, - DOUBLE_2_LONG_MAP_SER_DE + DOUBLE_2_LONG_MAP_SER_DE, + INT_LONG_PAIR_SER_DE, + LONG_LONG_PAIR_SER_DE, + FLOAT_LONG_PAIR_SER_DE, + DOUBLE_LONG_PAIR_SER_DE, + STRING_LONG_PAIR_SER_DE }; //@formatter:on diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 3285607..df26e16 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -25,6 +25,7 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FunctionContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.exception.BadQueryRequestException; @@ -156,6 +157,36 @@ public class AggregationFunctionFactory { return new AvgAggregationFunction(firstArgument); case MODE: return new ModeAggregationFunction(arguments); + case LASTWITHTIME: + if (arguments.size() == 3) { + ExpressionContext timeCol = arguments.get(1); + ExpressionContext dataType = arguments.get(2); + if (dataType.getType() != ExpressionContext.Type.LITERAL) { + throw new IllegalArgumentException("Third argument of lastWithTime Function should be literal." + + " The function can be used as lastWithTime(dataColumn, timeColumn, 'dataType')"); + } + FieldSpec.DataType fieldDataType + = FieldSpec.DataType.valueOf(dataType.getLiteral().toUpperCase()); + switch (fieldDataType) { + case BOOLEAN: + case INT: + return new LastIntValueWithTimeAggregationFunction( + firstArgument, timeCol, fieldDataType == FieldSpec.DataType.BOOLEAN); + case LONG: + return new LastLongValueWithTimeAggregationFunction(firstArgument, timeCol); + case FLOAT: + return new LastFloatValueWithTimeAggregationFunction(firstArgument, timeCol); + case DOUBLE: + return new LastDoubleValueWithTimeAggregationFunction(firstArgument, timeCol); + case STRING: + return new LastStringValueWithTimeAggregationFunction(firstArgument, timeCol); + default: + throw new IllegalArgumentException("Unsupported Value Type for lastWithTime Function:" + dataType); + } + } else { + throw new IllegalArgumentException("Three arguments are required for lastWithTime Function." + + " The function can be used as lastWithTime(dataColumn, timeColumn, 'dataType')"); + } case MINMAXRANGE: return new MinMaxRangeAggregationFunction(firstArgument); case DISTINCTCOUNT: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java new file mode 100644 index 0000000..796a0cb --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.DoubleLongPair; +import org.apache.pinot.segment.local.customobject.ValueLongPair; + + +/** + * This function is used for LastWithTime calculations for data column with double type. + * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the double data column to be calculated last on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any + * Numeric column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> { + + private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR + = new DoubleLongPair(Double.NaN, Long.MIN_VALUE); + + public LastDoubleValueWithTimeAggregationFunction( + ExpressionContext dataCol, + ExpressionContext timeCol) { + super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE); + } + + @Override + public ValueLongPair<Double> constructValueLongPair(Double value, long time) { + return new DoubleLongPair(value, time); + } + + @Override + public ValueLongPair<Double> getDefaultValueTimePair() { + return DEFAULT_VALUE_TIME_PAIR; + } + + @Override + public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair(); + Double lastData = defaultValueLongPair.getValue(); + long lastTime = defaultValueLongPair.getTime(); + double[] doubleValues = blockValSet.getDoubleValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + double data = doubleValues[i]; + long time = timeValues[i]; + if (time >= lastTime) { + lastTime = time; + lastData = data; + } + } + setAggregationResult(aggregationResultHolder, lastData, lastTime); + } + + @Override + public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + double[] doubleValues = blockValSet.getDoubleValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + double data = doubleValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + } + + @Override + public void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + double[] doubleValues = blockValSet.getDoubleValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + double value = doubleValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } + } + } + + @Override + public String getResultColumnName() { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'DOUBLE')"; + } + + @Override + public String getColumnName() { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_DOUBLE"; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.DOUBLE; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java new file mode 100644 index 0000000..6061a83 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.FloatLongPair; +import org.apache.pinot.segment.local.customobject.ValueLongPair; + + +/** + * This function is used for LastWithTime calculations for data column with float type. + * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'float') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the float data column to be calculated last on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any + * Numeric column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class LastFloatValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Float> { + + private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new FloatLongPair(Float.NaN, Long.MIN_VALUE); + + public LastFloatValueWithTimeAggregationFunction( + ExpressionContext dataCol, + ExpressionContext timeCol) { + super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE); + } + + @Override + public ValueLongPair<Float> constructValueLongPair(Float value, long time) { + return new FloatLongPair(value, time); + } + + @Override + public ValueLongPair<Float> getDefaultValueTimePair() { + return DEFAULT_VALUE_TIME_PAIR; + } + + @Override + public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair(); + Float lastData = defaultValueLongPair.getValue(); + long lastTime = defaultValueLongPair.getTime(); + float[] floatValues = blockValSet.getFloatValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + float data = floatValues[i]; + long time = timeValues[i]; + if (time >= lastTime) { + lastTime = time; + lastData = data; + } + } + setAggregationResult(aggregationResultHolder, lastData, lastTime); + } + + @Override + public void aggregateGroupResultWithRawDataSv(int length, + int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + float[] floatValues = blockValSet.getFloatValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + float data = floatValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + } + + @Override + public void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + float[] floatValues = blockValSet.getFloatValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + float value = floatValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } + } + } + + @Override + public String getResultColumnName() { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'FLOAT')"; + } + + @Override + public String getColumnName() { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_FLOAT"; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.FLOAT; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java new file mode 100644 index 0000000..ff9fdee --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.IntLongPair; +import org.apache.pinot.segment.local.customobject.ValueLongPair; + + +/** + * This function is used for LastWithTime calculations for data column with int/boolean type. + * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'int') + * or LastWithTime(dataExpression, timeExpression, 'boolean') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the int/boolean data column to be calculated last on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any + * Numeric column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class LastIntValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Integer> { + + private final static ValueLongPair<Integer> DEFAULT_VALUE_TIME_PAIR + = new IntLongPair(Integer.MIN_VALUE, Long.MIN_VALUE); + private final boolean _isBoolean; + + public LastIntValueWithTimeAggregationFunction( + ExpressionContext dataCol, + ExpressionContext timeCol, + boolean isBoolean) { + super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE); + _isBoolean = isBoolean; + } + + @Override + public ValueLongPair<Integer> constructValueLongPair(Integer value, long time) { + return new IntLongPair(value, time); + } + + @Override + public ValueLongPair<Integer> getDefaultValueTimePair() { + return DEFAULT_VALUE_TIME_PAIR; + } + + @Override + public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair(); + Integer lastData = defaultValueLongPair.getValue(); + long lastTime = defaultValueLongPair.getTime(); + int[] intValues = blockValSet.getIntValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + int data = intValues[i]; + long time = timeValues[i]; + if (time >= lastTime) { + lastTime = time; + lastData = data; + } + } + setAggregationResult(aggregationResultHolder, lastData, lastTime); + } + + @Override + public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + int[] intValues = blockValSet.getIntValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + int data = intValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + } + + @Override + public void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + int[] intValues = blockValSet.getIntValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + int value = intValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } + } + } + + @Override + public String getResultColumnName() { + if (_isBoolean) { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'BOOLEAN')"; + } else { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'INT')"; + } + } + + @Override + public String getColumnName() { + if (_isBoolean) { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_BOOLEAN"; + } else { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_INT"; + } + } + + @Override + public ColumnDataType getFinalResultColumnType() { + if (_isBoolean) { + return ColumnDataType.BOOLEAN; + } else { + return ColumnDataType.INT; + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java new file mode 100644 index 0000000..fa3c15f --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.LongLongPair; +import org.apache.pinot.segment.local.customobject.ValueLongPair; + + +/** + * This function is used for LastWithTime calculations for data column with long type. + * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'long') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the long data column to be calculated last on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any + * Numeric column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class LastLongValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Long> { + + private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR + = new LongLongPair(Long.MIN_VALUE, Long.MIN_VALUE); + + public LastLongValueWithTimeAggregationFunction( + ExpressionContext dataCol, + ExpressionContext timeCol) { + super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE); + } + + @Override + public ValueLongPair<Long> constructValueLongPair(Long value, long time) { + return new LongLongPair(value, time); + } + + @Override + public ValueLongPair<Long> getDefaultValueTimePair() { + return DEFAULT_VALUE_TIME_PAIR; + } + + @Override + public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair(); + Long lastData = defaultValueLongPair.getValue(); + long lastTime = defaultValueLongPair.getTime(); + long[] longValues = blockValSet.getLongValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + long data = longValues[i]; + long time = timeValues[i]; + if (time >= lastTime) { + lastTime = time; + lastData = data; + } + } + setAggregationResult(aggregationResultHolder, lastData, lastTime); + } + + @Override + public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + long[] longValues = blockValSet.getLongValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + long data = longValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + } + + @Override + public void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + long[] longValues = blockValSet.getLongValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + long value = longValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } + } + } + + @Override + public String getResultColumnName() { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'LONG')"; + } + + @Override + public String getColumnName() { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_LONG"; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.LONG; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java new file mode 100644 index 0000000..cb3caa8 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.StringLongPair; +import org.apache.pinot.segment.local.customobject.ValueLongPair; + + +/** + * This function is used for LastWithTime calculations for data column with string type. + * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'string') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the string data column to be calculated last on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any + * Numeric column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class LastStringValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<String> { + private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new StringLongPair("", Long.MIN_VALUE); + + public LastStringValueWithTimeAggregationFunction( + ExpressionContext dataCol, + ExpressionContext timeCol) { + super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE); + } + + @Override + public ValueLongPair<String> constructValueLongPair(String value, long time) { + return new StringLongPair(value, time); + } + + @Override + public ValueLongPair<String> getDefaultValueTimePair() { + return DEFAULT_VALUE_TIME_PAIR; + } + + @Override + public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair(); + String lastData = defaultValueLongPair.getValue(); + long lastTime = defaultValueLongPair.getTime(); + String[] stringValues = blockValSet.getStringValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + String data = stringValues[i]; + long time = timeValues[i]; + if (time >= lastTime) { + lastTime = time; + lastData = data; + } + } + setAggregationResult(aggregationResultHolder, lastData, lastTime); + } + + @Override + public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + String[] stringValues = blockValSet.getStringValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + String data = stringValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + } + + @Override + public void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + String[] stringValues = blockValSet.getStringValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + String value = stringValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } + } + } + + @Override + public String getResultColumnName() { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'STRING')"; + } + + @Override + public String getColumnName() { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_STRING"; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.STRING; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java new file mode 100644 index 0000000..adc91da --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +/** + * This function is used for LastWithTime calculations. + * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'dataType') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the column to be calculated last on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any + * Numeric column</li> + * <li>dataType: the data type of data column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>> + extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> { + protected final ExpressionContext _timeCol; + private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> _objectSerDe; + + public LastWithTimeAggregationFunction(ExpressionContext dataCol, + ExpressionContext timeCol, + ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) { + super(dataCol); + _timeCol = timeCol; + _objectSerDe = objectSerDe; + } + + public abstract ValueLongPair<V> constructValueLongPair(V value, long time); + + public abstract ValueLongPair<V> getDefaultValueTimePair(); + + public abstract void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet); + + public abstract void aggregateGroupResultWithRawDataSv(int length, + int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet); + + public abstract void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet); + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.LASTWITHTIME; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + + BlockValSet blockValSet = blockValSetMap.get(_expression); + BlockValSet blockTimeSet = blockValSetMap.get(_timeCol); + if (blockValSet.getValueType() != DataType.BYTES) { + aggregateResultWithRawData(length, aggregationResultHolder, blockValSet, blockTimeSet); + } else { + ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair(); + V lastData = defaultValueLongPair.getValue(); + long lastTime = defaultValueLongPair.getTime(); + // Serialized LastPair + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + for (int i = 0; i < length; i++) { + ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]); + V data = lastWithTimePair.getValue(); + long time = lastWithTimePair.getTime(); + if (time >= lastTime) { + lastTime = time; + lastData = data; + } + } + setAggregationResult(aggregationResultHolder, lastData, lastTime); + } + } + + protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, V data, long time) { + ValueLongPair lastWithTimePair = aggregationResultHolder.getResult(); + if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) { + aggregationResultHolder.setValue(constructValueLongPair(data, time)); + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + BlockValSet timeValSet = blockValSetMap.get(_timeCol); + if (blockValSet.getValueType() != DataType.BYTES) { + aggregateGroupResultWithRawDataSv(length, groupKeyArray, groupByResultHolder, + blockValSet, timeValSet); + } else { + // Serialized LastPair + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + for (int i = 0; i < length; i++) { + ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]); + setGroupByResult(groupKeyArray[i], + groupByResultHolder, + lastWithTimePair.getValue(), + lastWithTimePair.getTime()); + } + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + BlockValSet timeValSet = blockValSetMap.get(_timeCol); + if (blockValSet.getValueType() != DataType.BYTES) { + aggregateGroupResultWithRawDataMv(length, groupKeysArray, groupByResultHolder, blockValSet, timeValSet); + } else { + // Serialized ValueTimePair + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + for (int i = 0; i < length; i++) { + ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]); + V data = lastWithTimePair.getValue(); + long time = lastWithTimePair.getTime(); + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, data, time); + } + } + } + } + + protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, V data, long time) { + ValueLongPair lastWithTimePair = groupByResultHolder.getResult(groupKey); + if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) { + groupByResultHolder.setValueForKey(groupKey, constructValueLongPair(data, time)); + } + } + + @Override + public ValueLongPair<V> extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + ValueLongPair lastWithTimePair = aggregationResultHolder.getResult(); + if (lastWithTimePair == null) { + return getDefaultValueTimePair(); + } else { + return lastWithTimePair; + } + } + + @Override + public ValueLongPair<V> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + ValueLongPair<V> lastWithTimePair = groupByResultHolder.getResult(groupKey); + if (lastWithTimePair == null) { + return getDefaultValueTimePair(); + } else { + return lastWithTimePair; + } + } + + @Override + public ValueLongPair<V> merge(ValueLongPair<V> intermediateResult1, ValueLongPair<V> intermediateResult2) { + if (intermediateResult1.getTime() >= intermediateResult2.getTime()) { + return intermediateResult1; + } else { + return intermediateResult2; + } + } + + @Override + public List<ExpressionContext> getInputExpressions() { + return Arrays.asList(_expression, _timeCol); + } + + @Override + public boolean isIntermediateResultComparable() { + return false; + } + + @Override + public ColumnDataType getIntermediateResultColumnType() { + return ColumnDataType.OBJECT; + } + + @Override + public V extractFinalResult(ValueLongPair<V> intermediateResult) { + return intermediateResult.getValue(); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java index 8eea09b..e030f4c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java @@ -164,6 +164,13 @@ public class BrokerRequestToQueryContextConverter { for (String expression : stringExpressions) { arguments.add(RequestContextUtils.getExpressionFromPQL(expression)); } + } else if (functionName.equalsIgnoreCase(AggregationFunctionType.LASTWITHTIME.getName())) { + // For LASTWITHTIME query, only the first two arguments are expression, third one is literal if available + arguments.add(RequestContextUtils.getExpressionFromPQL(stringExpressions.get(0))); + arguments.add(RequestContextUtils.getExpressionFromPQL(stringExpressions.get(1))); + for (int i = 2; i < numArguments; i++) { + arguments.add(ExpressionContext.forLiteral(stringExpressions.get(i))); + } } else { // For non-DISTINCT query, only the first argument is expression, others are literals // NOTE: We directly use the string as the literal value because of the legacy behavior of PQL compiler diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java index 8e4c6df..3b8bfc4 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java @@ -34,8 +34,14 @@ import org.apache.commons.lang.RandomStringUtils; import org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction; import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction; import org.apache.pinot.segment.local.customobject.AvgPair; +import org.apache.pinot.segment.local.customobject.DoubleLongPair; +import org.apache.pinot.segment.local.customobject.FloatLongPair; +import org.apache.pinot.segment.local.customobject.IntLongPair; +import org.apache.pinot.segment.local.customobject.LongLongPair; import org.apache.pinot.segment.local.customobject.MinMaxRangePair; import org.apache.pinot.segment.local.customobject.QuantileDigest; +import org.apache.pinot.segment.local.customobject.StringLongPair; +import org.apache.pinot.segment.local.customobject.ValueLongPair; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -127,6 +133,76 @@ public class ObjectSerDeUtilsTest { } @Test + public void testIntValueTimePair() { + for (int i = 0; i < NUM_ITERATIONS; i++) { + ValueLongPair<Integer> expected = new IntLongPair(RANDOM.nextInt(), RANDOM.nextLong()); + + byte[] bytes = ObjectSerDeUtils.serialize(expected); + ValueLongPair<Integer> actual = ObjectSerDeUtils.deserialize(bytes, + ObjectSerDeUtils.ObjectType.IntLongPair); + + assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE); + assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE); + } + } + + @Test + public void testLongValueTimePair() { + for (int i = 0; i < NUM_ITERATIONS; i++) { + ValueLongPair<Long> expected = new LongLongPair(RANDOM.nextLong(), RANDOM.nextLong()); + + byte[] bytes = ObjectSerDeUtils.serialize(expected); + ValueLongPair<Long> actual = ObjectSerDeUtils.deserialize(bytes, + ObjectSerDeUtils.ObjectType.LongLongPair); + + assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE); + assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE); + } + } + + @Test + public void testFloatValueTimePair() { + for (int i = 0; i < NUM_ITERATIONS; i++) { + ValueLongPair<Float> expected = new FloatLongPair(RANDOM.nextFloat(), RANDOM.nextLong()); + + byte[] bytes = ObjectSerDeUtils.serialize(expected); + ValueLongPair<Float> actual = ObjectSerDeUtils.deserialize(bytes, ObjectSerDeUtils.ObjectType.FloatLongPair); + + assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE); + assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE); + } + } + + @Test + public void testDoubleValueTimePair() { + for (int i = 0; i < NUM_ITERATIONS; i++) { + ValueLongPair<Double> expected = new DoubleLongPair(RANDOM.nextDouble(), RANDOM.nextLong()); + + byte[] bytes = ObjectSerDeUtils.serialize(expected); + ValueLongPair<Double> actual = ObjectSerDeUtils.deserialize(bytes, + ObjectSerDeUtils.ObjectType.DoubleLongPair); + + assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE); + assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE); + } + } + + @Test + public void testStringValueTimePair() { + for (int i = 0; i < NUM_ITERATIONS; i++) { + ValueLongPair<String> expected = new StringLongPair(String.valueOf(RANDOM.nextDouble()), RANDOM.nextLong()); + + String temp = new String(expected.getValue().getBytes()); + byte[] bytes = ObjectSerDeUtils.serialize(expected); + ValueLongPair<String> actual = ObjectSerDeUtils.deserialize(bytes, + ObjectSerDeUtils.ObjectType.StringLongPair); + + assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE); + assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE); + } + } + + @Test public void testHyperLogLog() { for (int i = 0; i < NUM_ITERATIONS; i++) { HyperLogLog expected = new HyperLogLog(7); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java index e5e0e22..9c730f1 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java @@ -87,6 +87,48 @@ public class AggregationFunctionFactoryTest { assertEquals(aggregationFunction.getColumnName(), "mode_column"); assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("LaStWiThTiMe", "(column,timeColumn,'BOOLEAN')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof LastIntValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_BOOLEAN"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("LaStWiThTiMe", "(column,timeColumn,'INT')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof LastIntValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_INT"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("LaStWiThTiMe", "(column,timeColumn,'LONG')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof LastLongValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_LONG"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("LaStWiThTiMe", "(column,timeColumn,'FLOAT')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof LastFloatValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_FLOAT"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("LaStWiThTiMe", "(column,timeColumn,'DOUBLE')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof LastDoubleValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_DOUBLE"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("LaStWiThTiMe", "(column,timeColumn,'STRING')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof LastStringValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_STRING"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("MiNmAxRaNgE"); aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); assertTrue(aggregationFunction instanceof MinMaxRangeAggregationFunction); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/LastWithTimeQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/LastWithTimeQueriesTest.java new file mode 100644 index 0000000..75d78cc --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/queries/LastWithTimeQueriesTest.java @@ -0,0 +1,548 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.response.broker.AggregationResult; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.GroupByResult; +import org.apache.pinot.common.utils.HashUtil; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.query.AggregationGroupByOperator; +import org.apache.pinot.core.operator.query.AggregationOperator; +import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; +import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; +import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Queries test for LASTWITHTIME queries. + */ +@SuppressWarnings("rawtypes") +public class LastWithTimeQueriesTest extends BaseQueriesTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "LastQueriesTest"); + private static final String RAW_TABLE_NAME = "testTable"; + private static final String SEGMENT_NAME = "testSegment"; + private static final Random RANDOM = new Random(); + + private static final int NUM_RECORDS = 2000; + private static final int MAX_VALUE = 1000; + + private static final String BOOL_COLUMN = "boolColumn"; + private static final String BOOL_NO_DICT_COLUMN = "boolNoDictColumn"; + private static final String INT_COLUMN = "intColumn"; + private static final String INT_MV_COLUMN = "intMvColumn"; + private static final String INT_NO_DICT_COLUMN = "intNoDictColumn"; + private static final String LONG_COLUMN = "longColumn"; + private static final String LONG_MV_COLUMN = "longMvColumn"; + private static final String LONG_NO_DICT_COLUMN = "longNoDictColumn"; + private static final String FLOAT_COLUMN = "floatColumn"; + private static final String FLOAT_MV_COLUMN = "floatMvColumn"; + private static final String FLOAT_NO_DICT_COLUMN = "floatNoDictColumn"; + private static final String DOUBLE_COLUMN = "doubleColumn"; + private static final String DOUBLE_MV_COLUMN = "doubleMvColumn"; + private static final String DOUBLE_NO_DICT_COLUMN = "doubleNoDictColumn"; + private static final String STRING_COLUMN = "stringColumn"; + private static final String STRING_MV_COLUMN = "stringMvColumn"; + private static final String STRING_NO_DICT_COLUMN = "stringNoDictColumn"; + private static final String TIME_COLUMN = "timestampColumn"; + private static final Schema SCHEMA = new Schema.SchemaBuilder() + .addSingleValueDimension(BOOL_COLUMN, DataType.BOOLEAN) + .addSingleValueDimension(BOOL_NO_DICT_COLUMN, DataType.BOOLEAN) + .addSingleValueDimension(INT_COLUMN, DataType.INT) + .addMultiValueDimension(INT_MV_COLUMN, DataType.INT) + .addSingleValueDimension(INT_NO_DICT_COLUMN, DataType.INT) + .addSingleValueDimension(LONG_COLUMN, DataType.LONG) + .addMultiValueDimension(LONG_MV_COLUMN, DataType.LONG) + .addSingleValueDimension(LONG_NO_DICT_COLUMN, DataType.LONG) + .addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT) + .addMultiValueDimension(FLOAT_MV_COLUMN, DataType.FLOAT) + .addSingleValueDimension(FLOAT_NO_DICT_COLUMN, DataType.FLOAT) + .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE) + .addMultiValueDimension(DOUBLE_MV_COLUMN, DataType.DOUBLE) + .addSingleValueDimension(DOUBLE_NO_DICT_COLUMN, DataType.DOUBLE) + .addSingleValueDimension(STRING_COLUMN, DataType.STRING) + .addMultiValueDimension(STRING_MV_COLUMN, DataType.STRING) + .addSingleValueDimension(STRING_NO_DICT_COLUMN, DataType.STRING) + .addSingleValueDimension(TIME_COLUMN, DataType.LONG).build(); + private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setNoDictionaryColumns( + Lists.newArrayList(INT_NO_DICT_COLUMN, LONG_NO_DICT_COLUMN, FLOAT_NO_DICT_COLUMN, DOUBLE_NO_DICT_COLUMN)) + .build(); + private static final double DELTA = 0.00001; + + private Boolean _expectedResultLastBoolean; + private Integer _expectedResultLastInt; + private Long _expectedResultLastLong; + private Float _expectedResultLastFloat; + private Double _expectedResultLastDouble; + private String _expectedResultLastString; + private Map<Integer, Boolean> _boolGroupValues; + private Map<Integer, Integer> _intGroupValues; + private Map<Integer, Long> _longGroupValues; + private Map<Integer, Float> _floatGroupValues; + private Map<Integer, Double> _doubleGroupValues; + private Map<Integer, String> _stringGroupValues; + private IndexSegment _indexSegment; + private List<IndexSegment> _indexSegments; + + @Override + protected String getFilter() { + // NOTE: Use a match all filter to switch between DictionaryBasedAggregationOperator and AggregationOperator + return " WHERE intColumn >= 0"; + } + + @Override + protected IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + protected List<IndexSegment> getIndexSegments() { + return _indexSegments; + } + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteDirectory(INDEX_DIR); + + List<GenericRow> records = new ArrayList<>(NUM_RECORDS); + int hashMapCapacity = HashUtil.getHashMapCapacity(MAX_VALUE); + _boolGroupValues = new HashMap<>(); + _intGroupValues = new HashMap<>(); + _longGroupValues = new HashMap<>(); + _floatGroupValues = new HashMap<>(); + _doubleGroupValues = new HashMap<>(); + _stringGroupValues = new HashMap<>(); + for (int i = 0; i < NUM_RECORDS; i++) { + boolean boolValue = RANDOM.nextBoolean(); + int intValue = RANDOM.nextInt(MAX_VALUE); + long longValue = RANDOM.nextLong(); + float floatValue = RANDOM.nextFloat(); + double doubleValue = RANDOM.nextDouble(); + String strValue = String.valueOf(RANDOM.nextDouble()); + GenericRow record = new GenericRow(); + record.putValue(BOOL_COLUMN, boolValue); + record.putValue(BOOL_NO_DICT_COLUMN, boolValue); + record.putValue(INT_COLUMN, intValue); + record.putValue(INT_MV_COLUMN, new Integer[]{intValue, intValue}); + record.putValue(INT_NO_DICT_COLUMN, intValue); + record.putValue(LONG_COLUMN, longValue); + record.putValue(LONG_MV_COLUMN, new Long[]{longValue, longValue}); + record.putValue(LONG_NO_DICT_COLUMN, longValue); + record.putValue(FLOAT_COLUMN, floatValue); + record.putValue(FLOAT_MV_COLUMN, new Float[]{floatValue, floatValue}); + record.putValue(FLOAT_NO_DICT_COLUMN, floatValue); + record.putValue(DOUBLE_COLUMN, doubleValue); + record.putValue(DOUBLE_MV_COLUMN, new Double[]{doubleValue, doubleValue}); + record.putValue(DOUBLE_NO_DICT_COLUMN, doubleValue); + record.putValue(STRING_COLUMN, strValue); + record.putValue(STRING_MV_COLUMN, new String[]{strValue, strValue}); + record.putValue(STRING_NO_DICT_COLUMN, strValue); + record.putValue(TIME_COLUMN, (long) i); + if (i == NUM_RECORDS - 1) { + _expectedResultLastBoolean = boolValue; + _expectedResultLastInt = intValue; + _expectedResultLastLong = longValue; + _expectedResultLastFloat = floatValue; + _expectedResultLastDouble = doubleValue; + _expectedResultLastString = strValue; + } + _boolGroupValues.put(intValue, boolValue); + _intGroupValues.put(intValue, intValue); + _longGroupValues.put(intValue, longValue); + _floatGroupValues.put(intValue, floatValue); + _doubleGroupValues.put(intValue, doubleValue); + _stringGroupValues.put(intValue, strValue); + records.add(record); + } + + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); + segmentGeneratorConfig.setTableName(RAW_TABLE_NAME); + segmentGeneratorConfig.setSegmentName(SEGMENT_NAME); + segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath()); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records)); + driver.build(); + + ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); + } + + @Test + public void testAggregationOnly() { + String query = "SELECT LASTWITHTIME(boolColumn,timestampColumn, BOOLEAN)," + + " LASTWITHTIME(intColumn,timestampColumn, Int)," + + " LASTWITHTIME(longColumn,timestampColumn, Long)," + + " LASTWITHTIME(floatColumn,timestampColumn, Float)," + + " LASTWITHTIME(doubleColumn,timestampColumn, Double)," + + " LASTWITHTIME(stringColumn,timestampColumn, String) FROM testTable"; + + // Inner segment + Operator operator = getOperatorForPqlQuery(query); + assertTrue(operator instanceof AggregationOperator); + IntermediateResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock(); + QueriesTestUtils + .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS, + NUM_RECORDS); + List<Object> aggregationResultsWithoutFilter = resultsBlock.getAggregationResult(); + + operator = getOperatorForPqlQueryWithFilter(query); + assertTrue(operator instanceof AggregationOperator); + IntermediateResultsBlock resultsBlockWithFilter = ((AggregationOperator) operator).nextBlock(); + QueriesTestUtils + .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS, + NUM_RECORDS); + List<Object> aggregationResultWithFilter = resultsBlockWithFilter.getAggregationResult(); + + assertNotNull(aggregationResultsWithoutFilter); + assertNotNull(aggregationResultWithFilter); + assertEquals(aggregationResultsWithoutFilter.size(), aggregationResultWithFilter.size()); + for (int i = 0; i < aggregationResultsWithoutFilter.size(); i++) { + assertTrue(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(i)).compareTo( + (ValueLongPair<Integer>) aggregationResultWithFilter.get(i)) == 0); + } + assertEquals((((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(0))).getValue() != 0, + _expectedResultLastBoolean.booleanValue()); + assertEquals(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(1)).getValue(), _expectedResultLastInt); + assertEquals(((ValueLongPair<Long>) aggregationResultsWithoutFilter.get(2)).getValue(), _expectedResultLastLong); + assertEquals(((ValueLongPair<Float>) aggregationResultsWithoutFilter.get(3)).getValue(), _expectedResultLastFloat); + assertEquals(((ValueLongPair<Double>) aggregationResultsWithoutFilter.get(4)).getValue(), + _expectedResultLastDouble); + assertEquals(((ValueLongPair<String>) aggregationResultsWithoutFilter.get(5)).getValue(), + _expectedResultLastString); + + BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); + + Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); + Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS); + List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults(); + Assert.assertEquals(aggregationResults.size(), 6); + Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()), + _expectedResultLastBoolean.booleanValue()); + Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()), + _expectedResultLastInt.intValue()); + Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()), + _expectedResultLastLong.longValue()); + Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()), + _expectedResultLastFloat.floatValue(), DELTA); + Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()), + _expectedResultLastDouble.doubleValue(), DELTA); + Assert.assertEquals(aggregationResults.get(5).getValue().toString(), + _expectedResultLastString); + + brokerResponse = getBrokerResponseForPqlQueryWithFilter(query); + Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); + Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS); + aggregationResults = brokerResponse.getAggregationResults(); + Assert.assertEquals(aggregationResults.size(), 6); + Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()), + _expectedResultLastBoolean.booleanValue()); + Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()), + _expectedResultLastInt.intValue()); + Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()), + _expectedResultLastLong.longValue()); + Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()), + _expectedResultLastFloat.floatValue(), DELTA); + Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()), + _expectedResultLastDouble.doubleValue(), DELTA); + Assert.assertEquals(aggregationResults.get(5).getValue().toString(), + _expectedResultLastString); + } + + @Test + public void testAggregationOnlyNoDictionary() { + String query = + "SELECT LASTWITHTIME(boolNoDictColumn,timestampColumn,boolean)," + + " LASTWITHTIME(intNoDictColumn,timestampColumn,int)," + + " LASTWITHTIME(longNoDictColumn,timestampColumn,long)," + + " LASTWITHTIME(floatNoDictColumn,timestampColumn,float)," + + " LASTWITHTIME(doubleNoDictColumn,timestampColumn,double)," + + " LASTWITHTIME(stringNoDictColumn,timestampColumn,string) FROM testTable"; + + // Inner segment + Operator operator = getOperatorForPqlQuery(query); + assertTrue(operator instanceof AggregationOperator); + IntermediateResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock(); + QueriesTestUtils + .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS, + NUM_RECORDS); + List<Object> aggregationResultsWithoutFilter = resultsBlock.getAggregationResult(); + + operator = getOperatorForPqlQueryWithFilter(query); + assertTrue(operator instanceof AggregationOperator); + IntermediateResultsBlock resultsBlockWithFilter = ((AggregationOperator) operator).nextBlock(); + QueriesTestUtils + .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS, + NUM_RECORDS); + List<Object> aggregationResultWithFilter = resultsBlockWithFilter.getAggregationResult(); + + assertNotNull(aggregationResultsWithoutFilter); + assertNotNull(aggregationResultWithFilter); + assertEquals(aggregationResultsWithoutFilter.size(), aggregationResultWithFilter.size()); + for (int i = 0; i < aggregationResultsWithoutFilter.size(); i++) { + assertTrue(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(i)).compareTo( + (ValueLongPair<Integer>) aggregationResultWithFilter.get(i)) == 0); + } + + assertEquals(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(0)).getValue() != 0, + _expectedResultLastBoolean.booleanValue()); + assertEquals(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(1)).getValue(), _expectedResultLastInt); + assertEquals(((ValueLongPair<Long>) aggregationResultsWithoutFilter.get(2)).getValue(), _expectedResultLastLong); + assertEquals(((ValueLongPair<Float>) aggregationResultsWithoutFilter.get(3)).getValue(), _expectedResultLastFloat); + assertEquals(((ValueLongPair<Double>) aggregationResultsWithoutFilter.get(4)).getValue(), + _expectedResultLastDouble); + assertEquals(((ValueLongPair<String>) aggregationResultsWithoutFilter.get(5)).getValue(), + _expectedResultLastString); + + BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); + + Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); + Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS); + List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults(); + Assert.assertEquals(aggregationResults.size(), 6); + Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()), + _expectedResultLastBoolean.booleanValue()); + Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()), + _expectedResultLastInt.intValue()); + Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()), + _expectedResultLastLong.longValue()); + Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()), + _expectedResultLastFloat, DELTA); + Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()), + _expectedResultLastDouble, DELTA); + Assert.assertEquals(aggregationResults.get(5).getValue().toString(), + _expectedResultLastString); + + brokerResponse = getBrokerResponseForPqlQueryWithFilter(query); + Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); + Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS); + aggregationResults = brokerResponse.getAggregationResults(); + Assert.assertEquals(aggregationResults.size(), 6); + Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()), + _expectedResultLastBoolean.booleanValue()); + Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()), + _expectedResultLastInt.intValue()); + Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()), + _expectedResultLastLong.longValue()); + Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()), + _expectedResultLastFloat, DELTA); + Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()), + _expectedResultLastDouble, DELTA); + Assert.assertEquals(aggregationResults.get(5).getValue().toString(), + _expectedResultLastString); + } + + @Test + public void testAggregationGroupBySv() { + String query = + "SELECT LASTWITHTIME(boolColumn,timestampColumn,boolean)," + + " LASTWITHTIME(intColumn,timestampColumn,int)," + + " LASTWITHTIME(longColumn,timestampColumn,long)," + + " LASTWITHTIME(floatColumn,timestampColumn,float)," + + " LASTWITHTIME(doubleColumn,timestampColumn,double)," + + " LASTWITHTIME(stringColumn,timestampColumn,string) FROM testTable GROUP BY intColumn"; + + verifyAggregationResultsFromInnerSegments(query, 7); + + verifyAggregationResultsFromInterSegments(query, 7); + } + + @Test + public void testAggregationGroupBySvNoDictionary() { + String query = + "SELECT LASTWITHTIME(boolNoDictColumn,timestampColumn,boolean)," + + " LASTWITHTIME(intNoDictColumn,timestampColumn,int)," + + " LASTWITHTIME(longNoDictColumn,timestampColumn,long)," + + " LASTWITHTIME(floatNoDictColumn,timestampColumn,float)," + + " LASTWITHTIME(doubleNoDictColumn,timestampColumn,double)," + + " LASTWITHTIME(stringNoDictColumn,timestampColumn,string)" + + " FROM testTable GROUP BY intNoDictColumn"; + + verifyAggregationResultsFromInnerSegments(query, 7); + + verifyAggregationResultsFromInterSegments(query, 7); + } + + @Test + public void testAggregationGroupByMv() { + String query = + "SELECT LASTWITHTIME(boolColumn,timestampColumn,boolean)," + + " LASTWITHTIME(intColumn,timestampColumn,int)," + + " LASTWITHTIME(longColumn,timestampColumn,long)," + + " LASTWITHTIME(floatColumn,timestampColumn,float)," + + " LASTWITHTIME(doubleColumn,timestampColumn,double)," + + " LASTWITHTIME(stringColumn,timestampColumn,string) FROM testTable GROUP BY intMvColumn"; + + verifyAggregationResultsFromInnerSegments(query, 8); + + verifyAggregationResultsFromInterSegments(query, 8); + } + + @Test + public void testAggregationGroupByMvNoDictionary() { + String query = + "SELECT LASTWITHTIME(boolNoDictColumn,timestampColumn,boolean)," + + " LASTWITHTIME(intNoDictColumn,timestampColumn,int)," + + " LASTWITHTIME(longNoDictColumn,timestampColumn,long)," + + " LASTWITHTIME(floatNoDictColumn,timestampColumn,float)," + + " LASTWITHTIME(doubleNoDictColumn,timestampColumn,double)," + + " LASTWITHTIME(stringNoDictColumn,timestampColumn,string) FROM testTable GROUP BY intMvColumn"; + + verifyAggregationResultsFromInnerSegments(query, 8); + + verifyAggregationResultsFromInterSegments(query, 8); + } + + private void verifyAggregationResultsFromInnerSegments(String query, int numOfColumns) { + // Inner segment + Operator operator = getOperatorForPqlQuery(query); + assertTrue(operator instanceof AggregationGroupByOperator); + IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator) operator).nextBlock(); + QueriesTestUtils + .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), + NUM_RECORDS, + 0, + numOfColumns * NUM_RECORDS, + NUM_RECORDS); + AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); + assertNotNull(aggregationGroupByResult); + int numGroups = 0; + Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator(); + while (groupKeyIterator.hasNext()) { + numGroups++; + GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next(); + Integer key = (Integer) groupKey._keys[0]; + assertTrue(_intGroupValues.containsKey(key)); + assertEquals( + ((ValueLongPair<Integer>) aggregationGroupByResult.getResultForGroupId(0, groupKey._groupId)).getValue() + != 0, + _boolGroupValues.get(key).booleanValue()); + assertEquals( + ((ValueLongPair<Integer>) aggregationGroupByResult.getResultForGroupId(1, groupKey._groupId)).getValue(), + _intGroupValues.get(key)); + assertEquals( + ((ValueLongPair<Long>) aggregationGroupByResult.getResultForGroupId(2, groupKey._groupId)).getValue(), + _longGroupValues.get(key)); + assertEquals( + ((ValueLongPair<Float>) aggregationGroupByResult.getResultForGroupId(3, groupKey._groupId)).getValue(), + _floatGroupValues.get(key)); + assertEquals( + ((ValueLongPair<Double>) aggregationGroupByResult.getResultForGroupId(4, groupKey._groupId)).getValue(), + _doubleGroupValues.get(key)); + assertEquals( + ((ValueLongPair<String>) aggregationGroupByResult.getResultForGroupId(5, groupKey._groupId)).getValue(), + _stringGroupValues.get(key)); + } + assertEquals(numGroups, _intGroupValues.size()); + } + + private void verifyAggregationResultsFromInterSegments(String query, int numOfColumns) { + BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); + // Inter segments (expect 4 * inner segment result) + Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); + Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * numOfColumns * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS); + + List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults(); + Assert.assertEquals(aggregationResults.size(), 6); + Assert.assertNull(aggregationResults.get(0).getValue()); + for (GroupByResult intGroupByResult : aggregationResults.get(1).getGroupByResult()) { + assertEquals(intGroupByResult.getGroup().size(), 1); + assertTrue(_intGroupValues.containsKey(Integer.parseInt(intGroupByResult.getGroup().get(0)))); + assertEquals(Integer.parseInt(intGroupByResult.getValue().toString()), + _intGroupValues.get(Integer.parseInt(intGroupByResult.getGroup().get(0))).intValue()); + } + + Assert.assertNull(aggregationResults.get(1).getValue()); + for (GroupByResult longGroupByResult : aggregationResults.get(2).getGroupByResult()) { + assertEquals(longGroupByResult.getGroup().size(), 1); + assertTrue(_longGroupValues.containsKey(Integer.parseInt(longGroupByResult.getGroup().get(0)))); + assertEquals(Long.parseLong(longGroupByResult.getValue().toString()), + _longGroupValues.get(Integer.parseInt(longGroupByResult.getGroup().get(0))), DELTA); + } + + Assert.assertNull(aggregationResults.get(2).getValue()); + for (GroupByResult floatGroupByResult : aggregationResults.get(3).getGroupByResult()) { + assertEquals(floatGroupByResult.getGroup().size(), 1); + assertTrue(_floatGroupValues.containsKey(Integer.parseInt(floatGroupByResult.getGroup().get(0)))); + assertEquals(Double.parseDouble(floatGroupByResult.getValue().toString()), + _floatGroupValues.get(Integer.parseInt(floatGroupByResult.getGroup().get(0))), DELTA); + } + + Assert.assertNull(aggregationResults.get(3).getValue()); + for (GroupByResult doubleGroupByResult : aggregationResults.get(4).getGroupByResult()) { + assertEquals(doubleGroupByResult.getGroup().size(), 1); + assertTrue(_doubleGroupValues.containsKey(Integer.parseInt(doubleGroupByResult.getGroup().get(0)))); + assertEquals(Double.parseDouble(doubleGroupByResult.getValue().toString()), + _doubleGroupValues.get(Integer.parseInt(doubleGroupByResult.getGroup().get(0))), DELTA); + } + } + + @AfterClass + public void tearDown() + throws IOException { + _indexSegment.destroy(); + FileUtils.deleteDirectory(INDEX_DIR); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/DoubleLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/DoubleLongPair.java new file mode 100644 index 0000000..890a00d --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/DoubleLongPair.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import java.nio.ByteBuffer; + + +public class DoubleLongPair extends ValueLongPair<Double> { + + public DoubleLongPair(Double value, long time) { + super(value, time); + } + + public static DoubleLongPair fromBytes(byte[] bytes) { + return fromByteBuffer(ByteBuffer.wrap(bytes)); + } + + public static DoubleLongPair fromByteBuffer(ByteBuffer byteBuffer) { + return new DoubleLongPair(byteBuffer.getDouble(), byteBuffer.getLong()); + } + + @Override + public byte[] toBytes() { + ByteBuffer byteBuffer = ByteBuffer.allocate(Double.BYTES + Long.BYTES); + byteBuffer.putDouble(_value); + byteBuffer.putLong(_time); + return byteBuffer.array(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/FloatLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/FloatLongPair.java new file mode 100644 index 0000000..94fcb31 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/FloatLongPair.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import java.nio.ByteBuffer; + + +public class FloatLongPair extends ValueLongPair<Float> { + + public FloatLongPair(Float value, long time) { + super(value, time); + } + + public static FloatLongPair fromBytes(byte[] bytes) { + return fromByteBuffer(ByteBuffer.wrap(bytes)); + } + + public static FloatLongPair fromByteBuffer(ByteBuffer byteBuffer) { + return new FloatLongPair(byteBuffer.getFloat(), byteBuffer.getLong()); + } + + @Override + public byte[] toBytes() { + ByteBuffer byteBuffer = ByteBuffer.allocate(Float.BYTES + Long.BYTES); + byteBuffer.putFloat(_value); + byteBuffer.putLong(_time); + return byteBuffer.array(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/IntLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/IntLongPair.java new file mode 100644 index 0000000..191a931 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/IntLongPair.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import java.nio.ByteBuffer; + + +public class IntLongPair extends ValueLongPair<Integer> { + + public IntLongPair(Integer value, long time) { + super(value, time); + } + + public static IntLongPair fromBytes(byte[] bytes) { + return fromByteBuffer(ByteBuffer.wrap(bytes)); + } + + public static IntLongPair fromByteBuffer(ByteBuffer byteBuffer) { + return new IntLongPair(byteBuffer.getInt(), byteBuffer.getLong()); + } + + @Override + public byte[] toBytes() { + ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES + Long.BYTES); + byteBuffer.putInt(_value); + byteBuffer.putLong(_time); + return byteBuffer.array(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LongLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LongLongPair.java new file mode 100644 index 0000000..0ffa345 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LongLongPair.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import java.nio.ByteBuffer; + + +public class LongLongPair extends ValueLongPair<Long> { + + public LongLongPair(Long value, long time) { + super(value, time); + } + + public static LongLongPair fromBytes(byte[] bytes) { + return fromByteBuffer(ByteBuffer.wrap(bytes)); + } + + public static LongLongPair fromByteBuffer(ByteBuffer byteBuffer) { + return new LongLongPair(byteBuffer.getLong(), byteBuffer.getLong()); + } + + @Override + public byte[] toBytes() { + ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES + Long.BYTES); + byteBuffer.putLong(_value); + byteBuffer.putLong(_time); + return byteBuffer.array(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/StringLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/StringLongPair.java new file mode 100644 index 0000000..bfcdeae --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/StringLongPair.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import java.nio.ByteBuffer; + + +public class StringLongPair extends ValueLongPair<String> { + + public StringLongPair(String value, long time) { + super(value, time); + } + + public static StringLongPair fromBytes(byte[] bytes) { + return fromByteBuffer(ByteBuffer.wrap(bytes)); + } + + public static StringLongPair fromByteBuffer(ByteBuffer byteBuffer) { + int len = byteBuffer.getInt(); + byte[] bytes = new byte[len]; + byteBuffer.get(bytes); + return new StringLongPair(new String(bytes), byteBuffer.getLong()); + } + + @Override + public byte[] toBytes() { + int len = _value.length(); + ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES + len + Long.BYTES); + byteBuffer.putInt(len); + byteBuffer.put(_value.getBytes()); + byteBuffer.putLong(_time); + return byteBuffer.array(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java new file mode 100644 index 0000000..81e2ded --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +public abstract class ValueLongPair<V extends Comparable<V>> implements Comparable<ValueLongPair<V>> { + protected V _value; + protected long _time; + + public ValueLongPair(V value, long time) { + _value = value; + _time = time; + } + + public V getValue() { + return _value; + } + + public long getTime() { + return _time; + } + + abstract public byte[] toBytes(); + + @Override + public int compareTo(ValueLongPair<V> o) { + if (_time < o.getTime()) { + return -1; + } else if (_time > o.getTime()) { + return 1; + } else { + return _value.compareTo(o.getValue()); + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 197239b..53e5eba 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -33,6 +33,7 @@ public enum AggregationFunctionType { SUMPRECISION("sumPrecision"), AVG("avg"), MODE("mode"), + LASTWITHTIME("lastWithTime"), MINMAXRANGE("minMaxRange"), DISTINCTCOUNT("distinctCount"), DISTINCTCOUNTBITMAP("distinctCountBitmap"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org