This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b3172464ce add FIRSTWITHTIME aggregate function support #7647 (#8181)
b3172464ce is described below
commit b3172464cedff45a1af2be59db0822a6aeb6f403
Author: Yash Agarwal <[email protected]>
AuthorDate: Wed May 4 23:38:15 2022 +0530
add FIRSTWITHTIME aggregate function support #7647 (#8181)
---
.../function/AggregationFunctionTypeTest.java | 2 +
.../function/AggregationFunctionFactory.java | 30 ++
...irstDoubleValueWithTimeAggregationFunction.java | 126 ++++++
...FirstFloatValueWithTimeAggregationFunction.java | 127 ++++++
.../FirstIntValueWithTimeAggregationFunction.java | 142 +++++++
.../FirstLongValueWithTimeAggregationFunction.java | 126 ++++++
...irstStringValueWithTimeAggregationFunction.java | 124 ++++++
.../function/FirstWithTimeAggregationFunction.java | 222 +++++++++++
.../function/AggregationFunctionFactoryTest.java | 42 ++
.../pinot/queries/FirstWithTimeQueriesTest.java | 434 +++++++++++++++++++++
.../pinot/segment/spi/AggregationFunctionType.java | 1 +
11 files changed, 1376 insertions(+)
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
index ea9f69a815..f7a9ca16b0 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
@@ -33,6 +33,8 @@ public class AggregationFunctionTypeTest {
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("SuM"),
AggregationFunctionType.SUM);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("AvG"),
AggregationFunctionType.AVG);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MoDe"),
AggregationFunctionType.MODE);
+
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("FiRsTwItHtImE"),
+ AggregationFunctionType.FIRSTWITHTIME);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("LaStWiThTiMe"),
AggregationFunctionType.LASTWITHTIME);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MiNmAxRaNgE"),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 74c815edf5..fc684b434d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -160,6 +160,36 @@ public class AggregationFunctionFactory {
return new AvgAggregationFunction(firstArgument);
case MODE:
return new ModeAggregationFunction(arguments);
+ case FIRSTWITHTIME:
+ if (arguments.size() == 3) {
+ ExpressionContext timeCol = arguments.get(1);
+ ExpressionContext dataType = arguments.get(2);
+ if (dataType.getType() != ExpressionContext.Type.LITERAL) {
+ throw new IllegalArgumentException("Third argument of
firstWithTime Function should be literal."
+ + " The function can be used as firstWithTime(dataColumn,
timeColumn, 'dataType')");
+ }
+ FieldSpec.DataType fieldDataType
+ =
FieldSpec.DataType.valueOf(dataType.getLiteral().toUpperCase());
+ switch (fieldDataType) {
+ case BOOLEAN:
+ case INT:
+ return new FirstIntValueWithTimeAggregationFunction(
+ firstArgument, timeCol, fieldDataType ==
FieldSpec.DataType.BOOLEAN);
+ case LONG:
+ return new
FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol);
+ case FLOAT:
+ return new
FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol);
+ case DOUBLE:
+ return new
FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol);
+ case STRING:
+ return new
FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol);
+ default:
+ throw new IllegalArgumentException("Unsupported Value Type
for firstWithTime Function:" + dataType);
+ }
+ } else {
+ throw new IllegalArgumentException("Three arguments are required
for firstWithTime Function."
+ + " The function can be used as firstWithTime(dataColumn,
timeColumn, 'dataType')");
+ }
case LASTWITHTIME:
if (arguments.size() == 3) {
ExpressionContext timeCol = arguments.get(1);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..56aff397df
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for FirstWithTime calculations for data column with
double type.
+ * <p>The function can be used as FirstWithTime(dataExpression,
timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the double data column to be
calculated first on</li>
+ * <li>timeExpression: expression that contains the column to be used to
decide which data is first, can be any
+ * Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class FirstDoubleValueWithTimeAggregationFunction extends
FirstWithTimeAggregationFunction<Double> {
+
+ private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR
+ = new DoubleLongPair(Double.NaN, Long.MAX_VALUE);
+
+ public FirstDoubleValueWithTimeAggregationFunction(
+ ExpressionContext dataCol,
+ ExpressionContext timeCol) {
+ super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+ }
+
+ @Override
+ public ValueLongPair<Double> constructValueLongPair(Double value, long time)
{
+ return new DoubleLongPair(value, time);
+ }
+
+ @Override
+ public ValueLongPair<Double> getDefaultValueTimePair() {
+ return DEFAULT_VALUE_TIME_PAIR;
+ }
+
+ @Override
+ public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
+ Double firstData = defaultValueLongPair.getValue();
+ long firstTime = defaultValueLongPair.getTime();
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ double data = doubleValues[i];
+ long time = timeValues[i];
+ if (time <= firstTime) {
+ firstTime = time;
+ firstData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataSv(int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ double data = doubleValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ double value = doubleValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
+ }
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return getType().getName().toLowerCase() + "(" + _expression + "," +
_timeCol + ",'DOUBLE')";
+ }
+
+ @Override
+ public String getColumnName() {
+ return getType().getName() + "_" + _expression + "_" + _timeCol +
"_DOUBLE";
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.DOUBLE;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..3801ff17a3
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.FloatLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for FirstWithTime calculations for data column with
float type.
+ * <p>The function can be used as FirstWithTime(dataExpression,
timeExpression, 'float')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the float data column to be
calculated first on</li>
+ * <li>timeExpression: expression that contains the column to be used to
decide which data is first, can be any
+ * Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class FirstFloatValueWithTimeAggregationFunction extends
FirstWithTimeAggregationFunction<Float> {
+
+ private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new
FloatLongPair(Float.NaN, Long.MAX_VALUE);
+
+ public FirstFloatValueWithTimeAggregationFunction(
+ ExpressionContext dataCol,
+ ExpressionContext timeCol) {
+ super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE);
+ }
+
+ @Override
+ public ValueLongPair<Float> constructValueLongPair(Float value, long time) {
+ return new FloatLongPair(value, time);
+ }
+
+ @Override
+ public ValueLongPair<Float> getDefaultValueTimePair() {
+ return DEFAULT_VALUE_TIME_PAIR;
+ }
+
+ @Override
+ public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair();
+ Float firstData = defaultValueLongPair.getValue();
+ long firstTime = defaultValueLongPair.getTime();
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ float data = floatValues[i];
+ long time = timeValues[i];
+ if (time <= firstTime) {
+ firstTime = time;
+ firstData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataSv(int length,
+ int[] groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ float data = floatValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ float value = floatValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
+ }
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return getType().getName().toLowerCase() + "(" + _expression + "," +
_timeCol + ",'FLOAT')";
+ }
+
+ @Override
+ public String getColumnName() {
+ return getType().getName() + "_" + _expression + "_" + _timeCol + "_FLOAT";
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.FLOAT;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..be151b9994
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for FirstWithTime calculations for data column with
int/boolean type.
+ * <p>The function can be used as FirstWithTime(dataExpression,
timeExpression, 'int')
+ * or FirstWithTime(dataExpression, timeExpression, 'boolean')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the int/boolean data column
to be calculated first on</li>
+ * <li>timeExpression: expression that contains the column to be used to
decide which data is first, can be any
+ * Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class FirstIntValueWithTimeAggregationFunction extends
FirstWithTimeAggregationFunction<Integer> {
+
+ private final static ValueLongPair<Integer> DEFAULT_VALUE_TIME_PAIR
+ = new IntLongPair(Integer.MIN_VALUE, Long.MAX_VALUE);
+ private final boolean _isBoolean;
+
+ public FirstIntValueWithTimeAggregationFunction(
+ ExpressionContext dataCol,
+ ExpressionContext timeCol,
+ boolean isBoolean) {
+ super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE);
+ _isBoolean = isBoolean;
+ }
+
+ @Override
+ public ValueLongPair<Integer> constructValueLongPair(Integer value, long
time) {
+ return new IntLongPair(value, time);
+ }
+
+ @Override
+ public ValueLongPair<Integer> getDefaultValueTimePair() {
+ return DEFAULT_VALUE_TIME_PAIR;
+ }
+
+ @Override
+ public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair();
+ Integer firstData = defaultValueLongPair.getValue();
+ long firstTime = defaultValueLongPair.getTime();
+ int[] intValues = blockValSet.getIntValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ int data = intValues[i];
+ long time = timeValues[i];
+ if (time <= firstTime) {
+ firstTime = time;
+ firstData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataSv(int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ int[] intValues = blockValSet.getIntValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ int data = intValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ int[] intValues = blockValSet.getIntValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ int value = intValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
+ }
+ }
+
+ @Override
+ public String getResultColumnName() {
+ if (_isBoolean) {
+ return getType().getName().toLowerCase() + "(" + _expression + "," +
_timeCol + ",'BOOLEAN')";
+ } else {
+ return getType().getName().toLowerCase() + "(" + _expression + "," +
_timeCol + ",'INT')";
+ }
+ }
+
+ @Override
+ public String getColumnName() {
+ if (_isBoolean) {
+ return getType().getName() + "_" + _expression + "_" + _timeCol +
"_BOOLEAN";
+ } else {
+ return getType().getName() + "_" + _expression + "_" + _timeCol + "_INT";
+ }
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ if (_isBoolean) {
+ return ColumnDataType.BOOLEAN;
+ } else {
+ return ColumnDataType.INT;
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..960cda8820
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LongLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for FirstWithTime calculations for data column with
long type.
+ * <p>The function can be used as FirstWithTime(dataExpression,
timeExpression, 'long')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the long data column to be
calculated first on</li>
+ * <li>timeExpression: expression that contains the column to be used to
decide which data is first, can be any
+ * Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class FirstLongValueWithTimeAggregationFunction extends
FirstWithTimeAggregationFunction<Long> {
+
+ private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR
+ = new LongLongPair(Long.MIN_VALUE, Long.MAX_VALUE);
+
+ public FirstLongValueWithTimeAggregationFunction(
+ ExpressionContext dataCol,
+ ExpressionContext timeCol) {
+ super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE);
+ }
+
+ @Override
+ public ValueLongPair<Long> constructValueLongPair(Long value, long time) {
+ return new LongLongPair(value, time);
+ }
+
+ @Override
+ public ValueLongPair<Long> getDefaultValueTimePair() {
+ return DEFAULT_VALUE_TIME_PAIR;
+ }
+
+ @Override
+ public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair();
+ Long firstData = defaultValueLongPair.getValue();
+ long firstTime = defaultValueLongPair.getTime();
+ long[] longValues = blockValSet.getLongValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ long data = longValues[i];
+ long time = timeValues[i];
+ if (time <= firstTime) {
+ firstTime = time;
+ firstData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataSv(int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ long[] longValues = blockValSet.getLongValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ long data = longValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ long[] longValues = blockValSet.getLongValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ long value = longValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
+ }
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return getType().getName().toLowerCase() + "(" + _expression + "," +
_timeCol + ",'LONG')";
+ }
+
+ @Override
+ public String getColumnName() {
+ return getType().getName() + "_" + _expression + "_" + _timeCol + "_LONG";
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..b0ace7487e
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for FirstWithTime calculations for data column with
string type.
+ * <p>The function can be used as FirstWithTime(dataExpression,
timeExpression, 'string')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the string data column to be
calculated first on</li>
+ * <li>timeExpression: expression that contains the column to be used to
decide which data is first, can be any
+ * Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class FirstStringValueWithTimeAggregationFunction extends
FirstWithTimeAggregationFunction<String> {
+ private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new
StringLongPair("", Long.MAX_VALUE);
+
+ public FirstStringValueWithTimeAggregationFunction(
+ ExpressionContext dataCol,
+ ExpressionContext timeCol) {
+ super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE);
+ }
+
+ @Override
+ public ValueLongPair<String> constructValueLongPair(String value, long time)
{
+ return new StringLongPair(value, time);
+ }
+
+ @Override
+ public ValueLongPair<String> getDefaultValueTimePair() {
+ return DEFAULT_VALUE_TIME_PAIR;
+ }
+
+ @Override
+ public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair();
+ String firstData = defaultValueLongPair.getValue();
+ long firstTime = defaultValueLongPair.getTime();
+ String[] stringValues = blockValSet.getStringValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ String data = stringValues[i];
+ long time = timeValues[i];
+ if (time <= firstTime) {
+ firstTime = time;
+ firstData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataSv(int length, int[]
groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ String[] stringValues = blockValSet.getStringValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ String data = stringValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ String[] stringValues = blockValSet.getStringValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ String value = stringValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
+ }
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return getType().getName().toLowerCase() + "(" + _expression + "," +
_timeCol + ",'STRING')";
+ }
+
+ @Override
+ public String getColumnName() {
+ return getType().getName() + "_" + _expression + "_" + _timeCol +
"_STRING";
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.STRING;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
new file mode 100644
index 0000000000..7049d0f1db
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * This function is used for FirstWithTime calculations.
+ * <p>The function can be used as FirstWithTime(dataExpression,
timeExpression, 'dataType')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the column to be calculated
first on</li>
+ * <li>timeExpression: expression that contains the column to be used to
decide which data is first, can be any
+ * Numeric column</li>
+ * <li>dataType: the data type of data column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public abstract class FirstWithTimeAggregationFunction<V extends Comparable<V>>
+ extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> {
+ protected final ExpressionContext _timeCol;
+ private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>>
_objectSerDe;
+
+ public FirstWithTimeAggregationFunction(ExpressionContext dataCol,
+ ExpressionContext timeCol,
+ ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) {
+ super(dataCol);
+ _timeCol = timeCol;
+ _objectSerDe = objectSerDe;
+ }
+
+ public abstract ValueLongPair<V> constructValueLongPair(V value, long time);
+
+ public abstract ValueLongPair<V> getDefaultValueTimePair();
+
+ public abstract void aggregateResultWithRawData(int length,
AggregationResultHolder aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet);
+
+ public abstract void aggregateGroupResultWithRawDataSv(int length,
+ int[] groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet);
+
+ public abstract void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet);
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.FIRSTWITHTIME;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+ if (blockValSet.getValueType() != DataType.BYTES) {
+ aggregateResultWithRawData(length, aggregationResultHolder, blockValSet,
blockTimeSet);
+ } else {
+ ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair();
+ V firstData = defaultValueLongPair.getValue();
+ long firstTime = defaultValueLongPair.getTime();
+ // Serialized FirstPair
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ ValueLongPair<V> firstWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
+ V data = firstWithTimePair.getValue();
+ long time = firstWithTimePair.getTime();
+ if (time <= firstTime) {
+ firstTime = time;
+ firstData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ }
+ }
+
+ protected void setAggregationResult(AggregationResultHolder
aggregationResultHolder, V data, long time) {
+ ValueLongPair firstWithTimePair = aggregationResultHolder.getResult();
+ if (firstWithTimePair == null || time <= firstWithTimePair.getTime()) {
+ aggregationResultHolder.setValue(constructValueLongPair(data, time));
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+ if (blockValSet.getValueType() != DataType.BYTES) {
+ aggregateGroupResultWithRawDataSv(length, groupKeyArray,
groupByResultHolder,
+ blockValSet, timeValSet);
+ } else {
+ // Serialized FirstPair
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ ValueLongPair<V> firstWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
+ setGroupByResult(groupKeyArray[i],
+ groupByResultHolder,
+ firstWithTimePair.getValue(),
+ firstWithTimePair.getTime());
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+ if (blockValSet.getValueType() != DataType.BYTES) {
+ aggregateGroupResultWithRawDataMv(length, groupKeysArray,
groupByResultHolder, blockValSet, timeValSet);
+ } else {
+ // Serialized ValueTimePair
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ ValueLongPair<V> firstWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
+ V data = firstWithTimePair.getValue();
+ long time = firstWithTimePair.getTime();
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, data, time);
+ }
+ }
+ }
+ }
+
+ protected void setGroupByResult(int groupKey, GroupByResultHolder
groupByResultHolder, V data, long time) {
+ ValueLongPair firstWithTimePair = groupByResultHolder.getResult(groupKey);
+ if (firstWithTimePair == null || time <= firstWithTimePair.getTime()) {
+ groupByResultHolder.setValueForKey(groupKey,
constructValueLongPair(data, time));
+ }
+ }
+
+ @Override
+ public ValueLongPair<V> extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ ValueLongPair firstWithTimePair = aggregationResultHolder.getResult();
+ if (firstWithTimePair == null) {
+ return getDefaultValueTimePair();
+ } else {
+ return firstWithTimePair;
+ }
+ }
+
+ @Override
+ public ValueLongPair<V> extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ ValueLongPair<V> firstWithTimePair =
groupByResultHolder.getResult(groupKey);
+ if (firstWithTimePair == null) {
+ return getDefaultValueTimePair();
+ } else {
+ return firstWithTimePair;
+ }
+ }
+
+ @Override
+ public ValueLongPair<V> merge(ValueLongPair<V> intermediateResult1,
ValueLongPair<V> intermediateResult2) {
+ if (intermediateResult1.getTime() <= intermediateResult2.getTime()) {
+ return intermediateResult1;
+ } else {
+ return intermediateResult2;
+ }
+ }
+
+ @Override
+ public List<ExpressionContext> getInputExpressions() {
+ return Arrays.asList(_expression, _timeCol);
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public V extractFinalResult(ValueLongPair<V> intermediateResult) {
+ return intermediateResult.getValue();
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index d51d4704f6..58deac76ee 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -87,6 +87,48 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getColumnName(), "mode_column");
assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+ function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'BOOLEAN')");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
FirstIntValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.FIRSTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(),
"firstWithTime_column_timeColumn_BOOLEAN");
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
+ function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'INT')");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
FirstIntValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.FIRSTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(),
"firstWithTime_column_timeColumn_INT");
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
+ function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'LONG')");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
FirstLongValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.FIRSTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(),
"firstWithTime_column_timeColumn_LONG");
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
+ function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'FLOAT')");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
FirstFloatValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.FIRSTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(),
"firstWithTime_column_timeColumn_FLOAT");
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
+ function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'DOUBLE')");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
FirstDoubleValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.FIRSTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(),
"firstWithTime_column_timeColumn_DOUBLE");
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
+ function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'STRING')");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof
FirstStringValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.FIRSTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(),
"firstWithTime_column_timeColumn_STRING");
+ assertEquals(aggregationFunction.getResultColumnName(),
function.toString());
+
function = getFunction("LaStWiThTiMe", "(column,timeColumn,'BOOLEAN')");
aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof
LastIntValueWithTimeAggregationFunction);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/FirstWithTimeQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/FirstWithTimeQueriesTest.java
new file mode 100644
index 0000000000..e7be59a2c8
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/FirstWithTimeQueriesTest.java
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.FloatLongPair;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
+import org.apache.pinot.segment.local.customobject.LongLongPair;
+import org.apache.pinot.segment.local.customobject.StringLongPair;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Queries test for FIRSTWITHTIME queries.
+ */
+public class FirstWithTimeQueriesTest extends BaseQueriesTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"FirstQueriesTest");
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
+ private static final Random RANDOM = new Random();
+
+ private static final int NUM_RECORDS = 2000;
+ private static final int MAX_VALUE = 1000;
+
+ private static final String BOOL_COLUMN = "boolColumn";
+ private static final String BOOL_NO_DICT_COLUMN = "boolNoDictColumn";
+ private static final String INT_COLUMN = "intColumn";
+ private static final String INT_MV_COLUMN = "intMVColumn";
+ private static final String INT_NO_DICT_COLUMN = "intNoDictColumn";
+ private static final String LONG_COLUMN = "longColumn";
+ private static final String LONG_NO_DICT_COLUMN = "longNoDictColumn";
+ private static final String FLOAT_COLUMN = "floatColumn";
+ private static final String FLOAT_NO_DICT_COLUMN = "floatNoDictColumn";
+ private static final String DOUBLE_COLUMN = "doubleColumn";
+ private static final String DOUBLE_NO_DICT_COLUMN = "doubleNoDictColumn";
+ private static final String STRING_COLUMN = "stringColumn";
+ private static final String STRING_NO_DICT_COLUMN = "stringNoDictColumn";
+ private static final String TIME_COLUMN = "timestampColumn";
+ private static final Schema SCHEMA = new Schema.SchemaBuilder()
+ .addSingleValueDimension(BOOL_COLUMN, DataType.BOOLEAN)
+ .addSingleValueDimension(BOOL_NO_DICT_COLUMN, DataType.BOOLEAN)
+ .addSingleValueDimension(INT_COLUMN, DataType.INT)
+ .addMultiValueDimension(INT_MV_COLUMN, DataType.INT)
+ .addSingleValueDimension(INT_NO_DICT_COLUMN, DataType.INT)
+ .addSingleValueDimension(LONG_COLUMN, DataType.LONG)
+ .addSingleValueDimension(LONG_NO_DICT_COLUMN, DataType.LONG)
+ .addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+ .addSingleValueDimension(FLOAT_NO_DICT_COLUMN, DataType.FLOAT)
+ .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE)
+ .addSingleValueDimension(DOUBLE_NO_DICT_COLUMN, DataType.DOUBLE)
+ .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+ .addSingleValueDimension(STRING_NO_DICT_COLUMN, DataType.STRING)
+ .addSingleValueDimension(TIME_COLUMN, DataType.LONG).build();
+ private static final TableConfig TABLE_CONFIG = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNoDictionaryColumns(
+ Lists.newArrayList(INT_NO_DICT_COLUMN, LONG_NO_DICT_COLUMN,
FLOAT_NO_DICT_COLUMN, DOUBLE_NO_DICT_COLUMN))
+ .build();
+
+ private Boolean _expectedResultFirstBoolean;
+ private Integer _expectedResultFirstInt;
+ private Long _expectedResultFirstLong;
+ private Float _expectedResultFirstFloat;
+ private Double _expectedResultFirstDouble;
+ private String _expectedResultFirstString;
+ private Map<Integer, Boolean> _boolGroupValues;
+ private Map<Integer, Integer> _intGroupValues;
+ private Map<Integer, Long> _longGroupValues;
+ private Map<Integer, Float> _floatGroupValues;
+ private Map<Integer, Double> _doubleGroupValues;
+ private Map<Integer, String> _stringGroupValues;
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+
+ @Override
+ protected String getFilter() {
+ return "";
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(INDEX_DIR);
+
+ List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+ _boolGroupValues = new HashMap<>();
+ _intGroupValues = new HashMap<>();
+ _longGroupValues = new HashMap<>();
+ _floatGroupValues = new HashMap<>();
+ _doubleGroupValues = new HashMap<>();
+ _stringGroupValues = new HashMap<>();
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ boolean boolValue = RANDOM.nextBoolean();
+ int intValue = RANDOM.nextInt(MAX_VALUE);
+ long longValue = RANDOM.nextLong();
+ float floatValue = RANDOM.nextFloat();
+ double doubleValue = RANDOM.nextDouble();
+ String strValue = String.valueOf(RANDOM.nextDouble());
+ GenericRow record = new GenericRow();
+ record.putValue(BOOL_COLUMN, boolValue);
+ record.putValue(BOOL_NO_DICT_COLUMN, boolValue);
+ record.putValue(INT_COLUMN, intValue);
+ record.putValue(INT_MV_COLUMN, new Integer[]{intValue, intValue});
+ record.putValue(INT_NO_DICT_COLUMN, intValue);
+ record.putValue(LONG_COLUMN, longValue);
+ record.putValue(LONG_NO_DICT_COLUMN, longValue);
+ record.putValue(FLOAT_COLUMN, floatValue);
+ record.putValue(FLOAT_NO_DICT_COLUMN, floatValue);
+ record.putValue(DOUBLE_COLUMN, doubleValue);
+ record.putValue(DOUBLE_NO_DICT_COLUMN, doubleValue);
+ record.putValue(STRING_COLUMN, strValue);
+ record.putValue(STRING_NO_DICT_COLUMN, strValue);
+ record.putValue(TIME_COLUMN, (long) i);
+ if (i == 0) {
+ _expectedResultFirstBoolean = boolValue;
+ _expectedResultFirstInt = intValue;
+ _expectedResultFirstLong = longValue;
+ _expectedResultFirstFloat = floatValue;
+ _expectedResultFirstDouble = doubleValue;
+ _expectedResultFirstString = strValue;
+ }
+ _boolGroupValues.putIfAbsent(intValue, boolValue);
+ _intGroupValues.putIfAbsent(intValue, intValue);
+ _longGroupValues.putIfAbsent(intValue, longValue);
+ _floatGroupValues.putIfAbsent(intValue, floatValue);
+ _doubleGroupValues.putIfAbsent(intValue, doubleValue);
+ _stringGroupValues.putIfAbsent(intValue, strValue);
+ records.add(record);
+ }
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+ driver.build();
+
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+ }
+
+ @Test
+ public void testAggregationOnly() {
+ String query = "SELECT "
+ + "FIRSTWITHTIME(boolColumn, timestampColumn, 'BOOLEAN'), "
+ + "FIRSTWITHTIME(intColumn, timestampColumn, 'INT'), "
+ + "FIRSTWITHTIME(longColumn, timestampColumn, 'LONG'), "
+ + "FIRSTWITHTIME(floatColumn, timestampColumn, 'FLOAT'), "
+ + "FIRSTWITHTIME(doubleColumn, timestampColumn, 'DOUBLE'), "
+ + "FIRSTWITHTIME(stringColumn, timestampColumn, 'STRING') "
+ + "FROM testTable";
+
+ // Inner segment
+ AggregationOperator aggregationOperator = getOperator(query);
+ IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
NUM_RECORDS, 0,
+ 7 * NUM_RECORDS, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getAggregationResult();
+ assertNotNull(aggregationResult);
+ assertEquals(((IntLongPair) aggregationResult.get(0)).getValue() != 0,
_expectedResultFirstBoolean.booleanValue());
+ assertEquals(((IntLongPair) aggregationResult.get(1)).getValue(),
_expectedResultFirstInt);
+ assertEquals(((LongLongPair) aggregationResult.get(2)).getValue(),
_expectedResultFirstLong);
+ assertEquals(((FloatLongPair) aggregationResult.get(3)).getValue(),
_expectedResultFirstFloat);
+ assertEquals(((DoubleLongPair) aggregationResult.get(4)).getValue(),
_expectedResultFirstDouble);
+ assertEquals(((StringLongPair) aggregationResult.get(5)).getValue(),
_expectedResultFirstString);
+
+ // Inter segments (expect 4 * inner segment result)
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ DataSchema expectedDataSchema = new DataSchema(new String[]{
+ "firstwithtime(boolColumn,timestampColumn,'BOOLEAN')",
+ "firstwithtime(intColumn,timestampColumn,'INT')",
+ "firstwithtime(longColumn,timestampColumn,'LONG')",
+ "firstwithtime(floatColumn,timestampColumn,'FLOAT')",
+ "firstwithtime(doubleColumn,timestampColumn,'DOUBLE')",
+ "firstwithtime(stringColumn,timestampColumn,'STRING')"
+ }, new ColumnDataType[]{
+ ColumnDataType.BOOLEAN,
+ ColumnDataType.INT,
+ ColumnDataType.LONG,
+ ColumnDataType.FLOAT,
+ ColumnDataType.DOUBLE,
+ ColumnDataType.STRING
+ });
+ Object[] expectedResults = new Object[]{
+ _expectedResultFirstBoolean,
+ _expectedResultFirstInt,
+ _expectedResultFirstLong,
+ _expectedResultFirstFloat,
+ _expectedResultFirstDouble,
+ _expectedResultFirstString
+ };
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 4 * NUM_RECORDS,
0L, 4 * 7 * NUM_RECORDS,
+ 4 * NUM_RECORDS, new ResultTable(expectedDataSchema,
Collections.singletonList(expectedResults)));
+ }
+
+ @Test
+ public void testAggregationOnlyNoDictionary() {
+ String query = "SELECT "
+ + "FIRSTWITHTIME(boolNoDictColumn,timestampColumn,'boolean'), "
+ + "FIRSTWITHTIME(intNoDictColumn,timestampColumn,'int'), "
+ + "FIRSTWITHTIME(longNoDictColumn,timestampColumn,'long'), "
+ + "FIRSTWITHTIME(floatNoDictColumn,timestampColumn,'float'), "
+ + "FIRSTWITHTIME(doubleNoDictColumn,timestampColumn,'double'), "
+ + "FIRSTWITHTIME(stringNoDictColumn,timestampColumn,'string') "
+ + "FROM testTable";
+
+ // Inner segment
+ AggregationOperator aggregationOperator = getOperator(query);
+ IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
NUM_RECORDS, 0,
+ 7 * NUM_RECORDS, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getAggregationResult();
+ assertNotNull(aggregationResult);
+ assertEquals(((IntLongPair) aggregationResult.get(0)).getValue() != 0,
_expectedResultFirstBoolean.booleanValue());
+ assertEquals(((IntLongPair) aggregationResult.get(1)).getValue(),
_expectedResultFirstInt);
+ assertEquals(((LongLongPair) aggregationResult.get(2)).getValue(),
_expectedResultFirstLong);
+ assertEquals(((FloatLongPair) aggregationResult.get(3)).getValue(),
_expectedResultFirstFloat);
+ assertEquals(((DoubleLongPair) aggregationResult.get(4)).getValue(),
_expectedResultFirstDouble);
+ assertEquals(((StringLongPair) aggregationResult.get(5)).getValue(),
_expectedResultFirstString);
+
+ // Inter segments (expect 4 * inner segment result)
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ DataSchema expectedDataSchema = new DataSchema(new String[]{
+ "firstwithtime(boolNoDictColumn,timestampColumn,'BOOLEAN')",
+ "firstwithtime(intNoDictColumn,timestampColumn,'INT')",
+ "firstwithtime(longNoDictColumn,timestampColumn,'LONG')",
+ "firstwithtime(floatNoDictColumn,timestampColumn,'FLOAT')",
+ "firstwithtime(doubleNoDictColumn,timestampColumn,'DOUBLE')",
+ "firstwithtime(stringNoDictColumn,timestampColumn,'STRING')"
+ }, new ColumnDataType[]{
+ ColumnDataType.BOOLEAN,
+ ColumnDataType.INT,
+ ColumnDataType.LONG,
+ ColumnDataType.FLOAT,
+ ColumnDataType.DOUBLE,
+ ColumnDataType.STRING
+ });
+ Object[] expectedResults = new Object[]{
+ _expectedResultFirstBoolean,
+ _expectedResultFirstInt,
+ _expectedResultFirstLong,
+ _expectedResultFirstFloat,
+ _expectedResultFirstDouble,
+ _expectedResultFirstString
+ };
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 4 * NUM_RECORDS,
0L, 4 * 7 * NUM_RECORDS,
+ 4 * NUM_RECORDS, new ResultTable(expectedDataSchema,
Collections.singletonList(expectedResults)));
+ }
+
+ @Test
+ public void testAggregationGroupBySV() {
+ String query = "SELECT intColumn AS key, "
+ + "FIRSTWITHTIME(boolColumn,timestampColumn,'boolean') AS v1, "
+ + "FIRSTWITHTIME(intColumn,timestampColumn,'int') AS v2, "
+ + "FIRSTWITHTIME(longColumn,timestampColumn,'long') AS v3, "
+ + "FIRSTWITHTIME(floatColumn,timestampColumn,'float') AS v4, "
+ + "FIRSTWITHTIME(doubleColumn,timestampColumn,'double') AS v5, "
+ + "FIRSTWITHTIME(stringColumn,timestampColumn,'string') AS v6 "
+ + "FROM testTable GROUP BY key";
+ verifyAggregationGroupBy(query, 7);
+ }
+
+ @Test
+ public void testAggregationGroupBySVNoDictionary() {
+ String query = "SELECT intNoDictColumn AS key, "
+ + "FIRSTWITHTIME(boolNoDictColumn,timestampColumn,'boolean') AS v1, "
+ + "FIRSTWITHTIME(intNoDictColumn,timestampColumn,'int') AS v2, "
+ + "FIRSTWITHTIME(longNoDictColumn,timestampColumn,'long') AS v3, "
+ + "FIRSTWITHTIME(floatNoDictColumn,timestampColumn,'float') AS v4, "
+ + "FIRSTWITHTIME(doubleNoDictColumn,timestampColumn,'double') AS v5, "
+ + "FIRSTWITHTIME(stringNoDictColumn,timestampColumn,'string') AS v6 "
+ + "FROM testTable GROUP BY key";
+ verifyAggregationGroupBy(query, 7);
+ }
+
+ @Test
+ public void testAggregationGroupByMV() {
+ String query = "SELECT intMVColumn AS key, "
+ + "FIRSTWITHTIME(boolColumn,timestampColumn,'boolean') AS v1, "
+ + "FIRSTWITHTIME(intColumn,timestampColumn,'int') AS v2, "
+ + "FIRSTWITHTIME(longColumn,timestampColumn,'long') AS v3, "
+ + "FIRSTWITHTIME(floatColumn,timestampColumn,'float') AS v4, "
+ + "FIRSTWITHTIME(doubleColumn,timestampColumn,'double') AS v5, "
+ + "FIRSTWITHTIME(stringColumn,timestampColumn,'string') AS v6 "
+ + "FROM testTable GROUP BY key";
+ verifyAggregationGroupBy(query, 8);
+ }
+
+ @Test
+ public void testAggregationGroupByMVNoDictionary() {
+ String query = "SELECT intMVColumn AS key, "
+ + "FIRSTWITHTIME(boolNoDictColumn,timestampColumn,'boolean') AS v1, "
+ + "FIRSTWITHTIME(intNoDictColumn,timestampColumn,'int') AS v2, "
+ + "FIRSTWITHTIME(longNoDictColumn,timestampColumn,'long') AS v3, "
+ + "FIRSTWITHTIME(floatNoDictColumn,timestampColumn,'float') AS v4, "
+ + "FIRSTWITHTIME(doubleNoDictColumn,timestampColumn,'double') AS v5, "
+ + "FIRSTWITHTIME(stringNoDictColumn,timestampColumn,'string') AS v6 "
+ + "FROM testTable GROUP BY key";
+ verifyAggregationGroupBy(query, 8);
+ }
+
+ private void verifyAggregationGroupBy(String query, int numProjectedColumns)
{
+ // Inner segment
+ AggregationGroupByOrderByOperator groupByOperator = getOperator(query);
+ IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(),
NUM_RECORDS, 0,
+ numProjectedColumns * (long) NUM_RECORDS, NUM_RECORDS);
+ AggregationGroupByResult aggregationGroupByResult =
resultsBlock.getAggregationGroupByResult();
+ assertNotNull(aggregationGroupByResult);
+ int numGroups = 0;
+ Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator =
aggregationGroupByResult.getGroupKeyIterator();
+ while (groupKeyIterator.hasNext()) {
+ numGroups++;
+ GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+ Integer key = (Integer) groupKey._keys[0];
+ assertTrue(_intGroupValues.containsKey(key));
+ assertEquals(((IntLongPair)
aggregationGroupByResult.getResultForGroupId(0, groupKey._groupId)).getValue()
!= 0,
+ (boolean) _boolGroupValues.get(key));
+ assertEquals(((IntLongPair)
aggregationGroupByResult.getResultForGroupId(1, groupKey._groupId)).getValue(),
+ _intGroupValues.get(key));
+ assertEquals(((LongLongPair)
aggregationGroupByResult.getResultForGroupId(2, groupKey._groupId)).getValue(),
+ _longGroupValues.get(key));
+ assertEquals(((FloatLongPair)
aggregationGroupByResult.getResultForGroupId(3, groupKey._groupId)).getValue(),
+ _floatGroupValues.get(key));
+ assertEquals(((DoubleLongPair)
aggregationGroupByResult.getResultForGroupId(4, groupKey._groupId)).getValue(),
+ _doubleGroupValues.get(key));
+ assertEquals(((StringLongPair)
aggregationGroupByResult.getResultForGroupId(5, groupKey._groupId)).getValue(),
+ _stringGroupValues.get(key));
+ }
+ assertEquals(numGroups, _intGroupValues.size());
+
+ // Inter segments (expect 4 * inner segment result)
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+ assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 *
numProjectedColumns * (long) NUM_RECORDS);
+ assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+
+ ResultTable resultTable = brokerResponse.getResultTable();
+ DataSchema expectedDataSchema =
+ new DataSchema(new String[]{"key", "v1", "v2", "v3", "v4", "v5",
"v6"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.BOOLEAN, ColumnDataType.INT,
ColumnDataType.LONG,
+ ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING
+ });
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), 10);
+ for (Object[] row : rows) {
+ assertEquals(row.length, 7);
+ int key = (Integer) row[0];
+ assertEquals(row[1], _boolGroupValues.get(key));
+ assertEquals(row[2], _intGroupValues.get(key));
+ assertEquals(row[3], _longGroupValues.get(key));
+ assertEquals(row[4], _floatGroupValues.get(key));
+ assertEquals(row[5], _doubleGroupValues.get(key));
+ assertEquals(row[6], _stringGroupValues.get(key));
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ _indexSegment.destroy();
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 889ade6e42..a1a435600a 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -37,6 +37,7 @@ public enum AggregationFunctionType {
SUMPRECISION("sumPrecision"),
AVG("avg"),
MODE("mode"),
+ FIRSTWITHTIME("firstWithTime"),
LASTWITHTIME("lastWithTime"),
MINMAXRANGE("minMaxRange"),
DISTINCTCOUNT("distinctCount"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]