This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1231a2c985 [multistage] support aggregations that require intermediate
representations (#10120)
1231a2c985 is described below
commit 1231a2c9852d9c22b1ae7268aab69357347b24b6
Author: Almog Gavra <[email protected]>
AuthorDate: Wed Jan 18 17:11:21 2023 -0800
[multistage] support aggregations that require intermediate representations
(#10120)
* [multistage] support aggregations that require intermediate
representations
* move CustomObject to top level class
* fix case when interemediate stage was first to aggregate
* address rongs comments
* address feedback
* move InternalReduceFunctions to new package
---
.../java/org/apache/pinot/common/CustomObject.java | 30 +--
.../pinot/common/datablock/BaseDataBlock.java | 15 ++
.../apache/pinot/common/datablock/DataBlock.java | 3 +
.../pinot/common/datablock/DataBlockUtils.java | 13 +-
.../pinot/common/datatable/BaseDataTable.java | 1 +
.../apache/pinot/common/datatable/DataTable.java | 22 +--
.../pinot/common/datatable/DataTableImplV4.java | 1 +
.../org/apache/pinot/common/utils/DataSchema.java | 2 +
.../apache/pinot/core/common/ObjectSerDeUtils.java | 4 +-
.../core/common/datablock/DataBlockBuilder.java | 4 +-
.../common/datatable/BaseDataTableBuilder.java | 4 +-
.../function/AggregationFunctionFactory.java | 2 +
.../function/AggregationFunctionUtils.java | 3 +-
.../function/FourthMomentAggregationFunction.java | 8 +-
.../query/reduce/DistinctDataTableReducer.java | 3 +-
.../core/query/reduce/GroupByDataTableReducer.java | 3 +-
.../reduce/function/InternalReduceFunctions.java | 32 ++--
.../core/common/datatable/DataTableSerDeTest.java | 3 +-
.../calcite/rel/rules/PinotQueryRuleSets.java | 1 +
.../rules/PinotReduceAggregateFunctionsRule.java | 201 +++++++++++++++++++++
.../sql/fun/PinotBoolAndAggregateFunction.java | 4 +-
.../sql/fun/PinotBoolOrAggregateFunction.java | 4 +-
...ava => PinotFourthMomentAggregateFunction.java} | 11 +-
...on.java => PinotKurtosisAggregateFunction.java} | 11 +-
.../apache/calcite/sql/fun/PinotOperatorTable.java | 15 +-
...on.java => PinotSkewnessAggregateFunction.java} | 11 +-
.../query/planner/logical/RelToStageConverter.java | 8 +-
.../query/runtime/blocks/TransferableBlock.java | 3 +-
.../query/runtime/operator/AggregateOperator.java | 124 +++++++++----
.../LeafStageTransferableBlockOperator.java | 6 +-
.../query/runtime/plan/PhysicalPlanVisitor.java | 2 +-
.../pinot/query/service/QueryDispatcher.java | 3 +-
.../runtime/operator/AggregateOperatorTest.java | 30 +--
.../src/test/resources/queries/Skew.json | 86 +++++++++
.../pinot/segment/spi/AggregationFunctionType.java | 1 +
35 files changed, 548 insertions(+), 126 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
b/pinot-common/src/main/java/org/apache/pinot/common/CustomObject.java
similarity index 56%
copy from
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
copy to pinot-common/src/main/java/org/apache/pinot/common/CustomObject.java
index 3d336c1070..355fff08db 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/CustomObject.java
@@ -17,21 +17,27 @@
* under the License.
*/
-package org.apache.calcite.sql.fun;
+package org.apache.pinot.common;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.util.Optionality;
+import java.nio.ByteBuffer;
-public class PinotBoolOrAggregateFunction extends SqlAggFunction {
+public class CustomObject {
+ public static final int NULL_TYPE_VALUE = 100;
- public PinotBoolOrAggregateFunction() {
- super("BOOL_OR", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
- null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
- false, false, Optionality.FORBIDDEN);
+ private final int _type;
+ private final ByteBuffer _buffer;
+
+ public CustomObject(int type, ByteBuffer buffer) {
+ _type = type;
+ _buffer = buffer;
+ }
+
+ public int getType() {
+ return _type;
+ }
+
+ public ByteBuffer getBuffer() {
+ return _buffer;
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
index ebbb6e7a97..27f0b5f732 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.datatable.DataTableImplV3;
import org.apache.pinot.common.datatable.DataTableUtils;
import org.apache.pinot.common.response.ProcessingException;
@@ -345,6 +346,20 @@ public abstract class BaseDataBlock implements DataBlock {
return strings;
}
+ @Nullable
+ @Override
+ public CustomObject getCustomObject(int rowId, int colId) {
+ int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
+ int type = _variableSizeData.getInt();
+ if (size == 0) {
+ assert type == CustomObject.NULL_TYPE_VALUE;
+ return null;
+ }
+ ByteBuffer buffer = _variableSizeData.slice();
+ buffer.limit(size);
+ return new CustomObject(type, buffer);
+ }
+
@Nullable
@Override
public RoaringBitmap getNullRowIds(int colId) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java
index ed8d40760a..418426b4ac 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.utils.ByteArray;
@@ -74,6 +75,8 @@ public interface DataBlock {
String[] getStringArray(int rowId, int colId);
+ CustomObject getCustomObject(int rowId, int colId);
+
@Nullable
RoaringBitmap getNullRowIds(int colId);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index 8b41969e8e..cd9a729c8b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
@@ -87,14 +89,14 @@ public final class DataBlockUtils {
}
}
- public static List<Object[]> extractRows(DataBlock dataBlock) {
+ public static List<Object[]> extractRows(DataBlock dataBlock,
Function<CustomObject, Object> customObjectSerde) {
DataSchema dataSchema = dataBlock.getDataSchema();
DataSchema.ColumnDataType[] columnDataTypes =
dataSchema.getColumnDataTypes();
RoaringBitmap[] nullBitmaps = extractNullBitmaps(dataBlock);
int numRows = dataBlock.getNumberOfRows();
List<Object[]> rows = new ArrayList<>(numRows);
for (int rowId = 0; rowId < numRows; rowId++) {
- rows.add(extractRowFromDataBlock(dataBlock, rowId, columnDataTypes,
nullBitmaps));
+ rows.add(extractRowFromDataBlock(dataBlock, rowId, columnDataTypes,
nullBitmaps, customObjectSerde));
}
return rows;
}
@@ -189,8 +191,8 @@ public final class DataBlockUtils {
return nullBitmaps;
}
- public static Object[] extractRowFromDataBlock(DataBlock dataBlock, int
rowId, DataSchema.ColumnDataType[] dataTypes,
- RoaringBitmap[] nullBitmaps) {
+ private static Object[] extractRowFromDataBlock(DataBlock dataBlock, int
rowId, DataSchema.ColumnDataType[] dataTypes,
+ RoaringBitmap[] nullBitmaps, Function<CustomObject, Object>
customObjectSerde) {
int numColumns = nullBitmaps.length;
Object[] row = new Object[numColumns];
for (int colId = 0; colId < numColumns; colId++) {
@@ -250,6 +252,9 @@ public final class DataBlockUtils {
case TIMESTAMP_ARRAY:
row[colId] =
DataSchema.ColumnDataType.TIMESTAMP_ARRAY.convert(dataBlock.getLongArray(rowId,
colId));
break;
+ case OBJECT:
+ row[colId] =
customObjectSerde.apply(dataBlock.getCustomObject(rowId, colId));
+ break;
default:
throw new IllegalStateException(
String.format("Unsupported data type: %s for column: %s",
dataTypes[colId], colId));
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
index d4e493589a..06ba4b34f1 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index 5a5f323c93..9bac9706a7 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -21,10 +21,10 @@ package org.apache.pinot.common.datatable;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.math.BigDecimal;
-import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.utils.ByteArray;
@@ -89,26 +89,6 @@ public interface DataTable {
DataTable toDataOnlyDataTable();
- class CustomObject {
- public static final int NULL_TYPE_VALUE = 100;
-
- private final int _type;
- private final ByteBuffer _buffer;
-
- public CustomObject(int type, ByteBuffer buffer) {
- _type = type;
- _buffer = buffer;
- }
-
- public int getType() {
- return _type;
- }
-
- public ByteBuffer getBuffer() {
- return _buffer;
- }
- }
-
enum MetadataValueType {
INT, LONG, STRING
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
index 78fa5606b6..d4d27634f9 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 9c7b985bfd..4854f5bf22 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -422,6 +422,8 @@ public class DataSchema {
return toTimestampArray(value);
case BYTES_ARRAY:
return (byte[][]) value;
+ case OBJECT:
+ return (Serializable) value;
default:
throw new IllegalStateException(String.format("Cannot convert: '%s'
to type: %s", value, this));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index fbfee474e9..a01f02a5c9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -59,7 +59,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketch;
-import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.query.utils.idset.IdSets;
@@ -1244,7 +1244,7 @@ public class ObjectSerDeUtils {
return SER_DES[objectTypeValue].serialize(value);
}
- public static <T> T deserialize(DataTable.CustomObject customObject) {
+ public static <T> T deserialize(CustomObject customObject) {
return (T)
SER_DES[customObject.getType()].deserialize(customObject.getBuffer());
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 19ed9cd05a..573b3beadf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -28,11 +28,11 @@ import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.datablock.ColumnarDataBlock;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.RowDataBlock;
-import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.core.common.ObjectSerDeUtils;
@@ -524,7 +524,7 @@ public class DataBlockBuilder {
byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
if (value == null) {
byteBuffer.putInt(0);
-
builder._variableSizeDataOutputStream.writeInt(DataTable.CustomObject.NULL_TYPE_VALUE);
+
builder._variableSizeDataOutputStream.writeInt(CustomObject.NULL_TYPE_VALUE);
} else {
int objectTypeValue =
ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
index 9547924ac5..c0a8ff8ea1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
@@ -24,7 +24,7 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
-import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.datatable.DataTableUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.ObjectSerDeUtils;
@@ -103,7 +103,7 @@ public abstract class BaseDataTableBuilder implements
DataTableBuilder {
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
if (value == null) {
_currentRowDataByteBuffer.putInt(0);
-
_variableSizeDataOutputStream.writeInt(DataTable.CustomObject.NULL_TYPE_VALUE);
+ _variableSizeDataOutputStream.writeInt(CustomObject.NULL_TYPE_VALUE);
} else {
int objectTypeValue =
ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 9b571ff3c4..e7d1cba0e4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -297,6 +297,8 @@ public class AggregationFunctionFactory {
return new FourthMomentAggregationFunction(firstArgument,
FourthMomentAggregationFunction.Type.SKEWNESS);
case KURTOSIS:
return new FourthMomentAggregationFunction(firstArgument,
FourthMomentAggregationFunction.Type.KURTOSIS);
+ case FOURTHMOMENT:
+ return new FourthMomentAggregationFunction(firstArgument,
FourthMomentAggregationFunction.Type.MOMENT);
default:
throw new IllegalArgumentException();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 6b1dd21e3c..89e9d3d8e5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
@@ -142,7 +143,7 @@ public class AggregationFunctionUtils {
case DOUBLE:
return dataTable.getDouble(rowId, colId);
case OBJECT:
- DataTable.CustomObject customObject = dataTable.getCustomObject(rowId,
colId);
+ CustomObject customObject = dataTable.getCustomObject(rowId, colId);
return customObject != null ?
ObjectSerDeUtils.deserialize(customObject) : null;
default:
throw new IllegalStateException("Illegal column data type in
intermediate result: " + columnDataType);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
index 9cb06e4eeb..a6f4b707ff 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
@@ -36,7 +36,7 @@ public class FourthMomentAggregationFunction extends
BaseSingleInputAggregationF
private final Type _type;
enum Type {
- KURTOSIS, SKEWNESS
+ KURTOSIS, SKEWNESS, MOMENT
}
public FourthMomentAggregationFunction(ExpressionContext expression, Type
type) {
@@ -51,6 +51,8 @@ public class FourthMomentAggregationFunction extends
BaseSingleInputAggregationF
return AggregationFunctionType.KURTOSIS;
case SKEWNESS:
return AggregationFunctionType.SKEWNESS;
+ case MOMENT:
+ return AggregationFunctionType.FOURTHMOMENT;
default:
throw new IllegalArgumentException("Unexpected type " + _type);
}
@@ -159,6 +161,10 @@ public class FourthMomentAggregationFunction extends
BaseSingleInputAggregationF
return m4.kurtosis();
case SKEWNESS:
return m4.skew();
+ case MOMENT:
+ // this should never happen, as we're not extracting
+ // final result when using this method
+ throw new UnsupportedOperationException("Fourth moment cannot be used
as aggregation function directly");
default:
throw new IllegalStateException("Unexpected value: " + _type);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index af25afe24e..de65c18657 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -77,7 +78,7 @@ public class DistinctDataTableReducer implements
DataTableReducer {
int numColumns = dataSchema.size();
if (numColumns == 1 && dataSchema.getColumnDataType(0) ==
ColumnDataType.OBJECT) {
// DistinctTable is still being returned as a single object
- DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
+ CustomObject customObject = dataTable.getCustomObject(0, 0);
assert customObject != null;
DistinctTable distinctTable =
ObjectSerDeUtils.deserialize(customObject);
if (!distinctTable.isEmpty()) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 960cf1cb07..cb5975d02b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerGauge;
@@ -318,7 +319,7 @@ public class GroupByDataTableReducer implements
DataTableReducer {
break;
case OBJECT:
// TODO: Move ser/de into AggregationFunction interface
- DataTable.CustomObject customObject =
dataTable.getCustomObject(rowId, colId);
+ CustomObject customObject =
dataTable.getCustomObject(rowId, colId);
if (customObject != null) {
values[colId] =
ObjectSerDeUtils.deserialize(customObject);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/function/InternalReduceFunctions.java
similarity index 54%
copy from
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/function/InternalReduceFunctions.java
index 3610ce0c4d..e56e82298e 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/function/InternalReduceFunctions.java
@@ -17,21 +17,29 @@
* under the License.
*/
-package org.apache.calcite.sql.fun;
+package org.apache.pinot.core.query.reduce.function;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.util.Optionality;
+import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
+import org.apache.pinot.spi.annotations.ScalarFunction;
-public class PinotBoolAndAggregateFunction extends SqlAggFunction {
+/**
+ * This class contains functions that are necessary for the multistage engine
+ * aggregations that need to be reduced after the initial aggregation to get
+ * the final result.
+ */
+public class InternalReduceFunctions {
+
+ private InternalReduceFunctions() {
+ }
+
+ @ScalarFunction
+ public static double skewnessReduce(PinotFourthMoment fourthMoment) {
+ return fourthMoment.skew();
+ }
- public PinotBoolAndAggregateFunction() {
- super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
- null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
- false, false, Optionality.FORBIDDEN);
+ @ScalarFunction
+ public static double kurtosisReduce(PinotFourthMoment fourthMoment) {
+ return fourthMoment.kurtosis();
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 22b0e92630..9b53e87354 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Random;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.datatable.DataTableFactory;
@@ -734,7 +735,7 @@ public class DataTableSerDeTest {
ERROR_MESSAGE);
break;
case OBJECT:
- DataTable.CustomObject customObject =
newDataTable.getCustomObject(rowId, colId);
+ CustomObject customObject = newDataTable.getCustomObject(rowId,
colId);
if (isNull) {
Assert.assertNull(customObject, ERROR_MESSAGE);
} else {
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index 1828abd0e8..43ec25dfba 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -82,6 +82,7 @@ public class PinotQueryRuleSets {
CoreRules.AGGREGATE_UNION_AGGREGATE,
// reduce aggregate functions like AVG, STDDEV_POP etc.
+ PinotReduceAggregateFunctionsRule.INSTANCE,
CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
// remove unnecessary sort rule
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotReduceAggregateFunctionsRule.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotReduceAggregateFunctionsRule.java
new file mode 100644
index 0000000000..5c6fd18e86
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotReduceAggregateFunctionsRule.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.PinotFourthMomentAggregateFunction;
+import org.apache.calcite.sql.fun.PinotKurtosisAggregateFunction;
+import org.apache.calcite.sql.fun.PinotOperatorTable;
+import org.apache.calcite.sql.fun.PinotSkewnessAggregateFunction;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.CompositeList;
+
+
+/**
+ * This rule rewrites aggregate functions when necessary for Pinot's
+ * multistage engine. For example, SKEWNESS must be rewritten into two
+ * parts: a multi-stage FOURTH_MOMENT calculation and then a scalar function
+ * that reduces the moment into the skewness at the end. This is to ensure
+ * that the aggregation computation can merge partial results from different
+ * intermediate nodes before reducing it into the final result.
+ *
+ * <p>This implementation follows closely with Calcite's
+ * {@link AggregateReduceFunctionsRule}.
+ */
+public class PinotReduceAggregateFunctionsRule extends RelOptRule {
+
+ public static final PinotReduceAggregateFunctionsRule INSTANCE =
+ new PinotReduceAggregateFunctionsRule(PinotRuleUtils.PINOT_REL_FACTORY);
+
+ private static final Set<String> FUNCTIONS = ImmutableSet.of(
+ PinotSkewnessAggregateFunction.SKEWNESS,
+ PinotKurtosisAggregateFunction.KURTOSIS
+ );
+
+ protected PinotReduceAggregateFunctionsRule(RelBuilderFactory factory) {
+ super(operand(Aggregate.class, any()), factory, null);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ if (call.rels.length < 1) {
+ return false;
+ }
+
+ if (call.rel(0) instanceof Aggregate) {
+ Aggregate agg = call.rel(0);
+ for (AggregateCall aggCall : agg.getAggCallList()) {
+ if (canReduce(aggCall)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Aggregate oldAggRel = call.rel(0);
+ reduceAggs(call, oldAggRel);
+ }
+
+ private void reduceAggs(RelOptRuleCall ruleCall, Aggregate oldAggRel) {
+ RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+
+ List<AggregateCall> oldCalls = oldAggRel.getAggCallList();
+ final int groupCount = oldAggRel.getGroupCount();
+
+ final List<AggregateCall> newCalls = new ArrayList<>();
+ final Map<AggregateCall, RexNode> aggCallMapping = new HashMap<>();
+
+ final List<RexNode> projList = new ArrayList<>();
+
+ // pass through group key
+ for (int i = 0; i < groupCount; i++) {
+ projList.add(rexBuilder.makeInputRef(oldAggRel, i));
+ }
+
+ // List of input expressions. If a particular aggregate needs more, it
+ // will add an expression to the end, and we will create an extra project
+ final RelBuilder relBuilder = ruleCall.builder();
+ relBuilder.push(oldAggRel.getInput());
+ final List<RexNode> inputExprs = new ArrayList<>(relBuilder.fields());
+
+ // create new aggregate function calls and rest of project list together
+ for (AggregateCall oldCall : oldCalls) {
+ projList.add(
+ reduceAgg(oldAggRel, oldCall, newCalls, aggCallMapping, inputExprs));
+ }
+
+ final int extraArgCount = inputExprs.size() -
relBuilder.peek().getRowType().getFieldCount();
+ if (extraArgCount > 0) {
+ relBuilder.project(inputExprs,
+ CompositeList.of(
+ relBuilder.peek().getRowType().getFieldNames(),
+ Collections.nCopies(extraArgCount, null)));
+ }
+ newAggregateRel(relBuilder, oldAggRel, newCalls);
+ newCalcRel(relBuilder, oldAggRel.getRowType(), projList);
+ ruleCall.transformTo(relBuilder.build());
+ }
+
+ private RexNode reduceAgg(Aggregate oldAggRel, AggregateCall oldCall,
List<AggregateCall> newCalls,
+ Map<AggregateCall, RexNode> aggCallMapping, List<RexNode> inputExprs) {
+ if (canReduce(oldCall)) {
+ switch (oldCall.getAggregation().getName()) {
+ case PinotSkewnessAggregateFunction.SKEWNESS:
+ return reduceFourthMoment(oldAggRel, oldCall, newCalls,
aggCallMapping, false);
+ case PinotKurtosisAggregateFunction.KURTOSIS:
+ return reduceFourthMoment(oldAggRel, oldCall, newCalls,
aggCallMapping, true);
+ default:
+ throw new IllegalStateException("Unexpected aggregation name " +
oldCall.getAggregation().getName());
+ }
+ } else {
+ // anything else: preserve original call
+ RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+ final int nGroups = oldAggRel.getGroupCount();
+ return rexBuilder.addAggCall(oldCall,
+ nGroups,
+ newCalls,
+ aggCallMapping,
+ oldAggRel.getInput()::fieldIsNullable);
+ }
+ }
+
+ private RexNode reduceFourthMoment(Aggregate oldAggRel, AggregateCall
oldCall, List<AggregateCall> newCalls,
+ Map<AggregateCall, RexNode> aggCallMapping, boolean isKurtosis) {
+ final int nGroups = oldAggRel.getGroupCount();
+ final RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+ final AggregateCall fourthMomentCall =
+ AggregateCall.create(PinotFourthMomentAggregateFunction.INSTANCE,
+ oldCall.isDistinct(),
+ oldCall.isApproximate(),
+ oldCall.ignoreNulls(),
+ oldCall.getArgList(),
+ oldCall.filterArg,
+ oldCall.distinctKeys,
+ oldCall.collation,
+ oldAggRel.getGroupCount(),
+ oldAggRel.getInput(),
+ null,
+ null);
+
+ RexNode fmRef = rexBuilder.addAggCall(fourthMomentCall, nGroups, newCalls,
+ aggCallMapping, oldAggRel.getInput()::fieldIsNullable);
+
+ final RexNode skewRef = rexBuilder.makeCall(
+ isKurtosis ? PinotOperatorTable.KURTOSIS_REDUCE :
PinotOperatorTable.SKEWNESS_REDUCE,
+ fmRef);
+ return rexBuilder.makeCast(oldCall.getType(), skewRef);
+ }
+
+ private boolean canReduce(AggregateCall call) {
+ return FUNCTIONS.contains(call.getAggregation().getName());
+ }
+
+ protected void newAggregateRel(RelBuilder relBuilder,
+ Aggregate oldAggregate,
+ List<AggregateCall> newCalls) {
+ relBuilder.aggregate(
+ relBuilder.groupKey(oldAggregate.getGroupSet(),
oldAggregate.getGroupSets()),
+ newCalls);
+ }
+
+ protected void newCalcRel(RelBuilder relBuilder,
+ RelDataType rowType,
+ List<RexNode> exprs) {
+ relBuilder.project(exprs, rowType.getFieldNames());
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
index 3610ce0c4d..7963fd02e3 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
@@ -29,7 +29,9 @@ import org.apache.calcite.util.Optionality;
public class PinotBoolAndAggregateFunction extends SqlAggFunction {
- public PinotBoolAndAggregateFunction() {
+ public static final PinotBoolAndAggregateFunction INSTANCE = new
PinotBoolAndAggregateFunction();
+
+ private PinotBoolAndAggregateFunction() {
super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
false, false, Optionality.FORBIDDEN);
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
index 3d336c1070..547edef887 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
@@ -29,7 +29,9 @@ import org.apache.calcite.util.Optionality;
public class PinotBoolOrAggregateFunction extends SqlAggFunction {
- public PinotBoolOrAggregateFunction() {
+ public static final PinotBoolOrAggregateFunction INSTANCE = new
PinotBoolOrAggregateFunction();
+
+ private PinotBoolOrAggregateFunction() {
super("BOOL_OR", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
false, false, Optionality.FORBIDDEN);
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotFourthMomentAggregateFunction.java
similarity index 73%
copy from
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
copy to
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotFourthMomentAggregateFunction.java
index 3610ce0c4d..2d9ca9e3a0 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotFourthMomentAggregateFunction.java
@@ -24,14 +24,17 @@ import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Optionality;
-public class PinotBoolAndAggregateFunction extends SqlAggFunction {
+public class PinotFourthMomentAggregateFunction extends SqlAggFunction {
- public PinotBoolAndAggregateFunction() {
- super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
- null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ public static final PinotFourthMomentAggregateFunction INSTANCE = new
PinotFourthMomentAggregateFunction();
+
+ public PinotFourthMomentAggregateFunction() {
+ super("FOURTHMOMENT", null, SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(SqlTypeName.OTHER),
+ null, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION,
false, false, Optionality.FORBIDDEN);
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotKurtosisAggregateFunction.java
similarity index 74%
copy from
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
copy to
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotKurtosisAggregateFunction.java
index 3610ce0c4d..f2d7639625 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotKurtosisAggregateFunction.java
@@ -27,11 +27,14 @@ import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.util.Optionality;
-public class PinotBoolAndAggregateFunction extends SqlAggFunction {
+public class PinotKurtosisAggregateFunction extends SqlAggFunction {
- public PinotBoolAndAggregateFunction() {
- super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
- null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ public static final String KURTOSIS = "KURTOSIS";
+ public static final PinotKurtosisAggregateFunction INSTANCE = new
PinotKurtosisAggregateFunction();
+
+ public PinotKurtosisAggregateFunction() {
+ super(KURTOSIS, null, SqlKind.OTHER_FUNCTION, ReturnTypes.DOUBLE,
+ null, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION,
false, false, Optionality.FORBIDDEN);
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
index 8f46a0db4e..17af5a7d6d 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
@@ -23,7 +23,11 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.validate.SqlNameMatchers;
import org.apache.calcite.util.Util;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -46,8 +50,15 @@ public class PinotOperatorTable extends SqlStdOperatorTable {
private static @MonotonicNonNull PinotOperatorTable _instance;
public static final SqlFunction COALESCE = new PinotSqlCoalesceFunction();
- public static final SqlAggFunction BOOL_AND = new
PinotBoolAndAggregateFunction();
- public static final SqlAggFunction BOOL_OR = new
PinotBoolOrAggregateFunction();
+ public static final SqlFunction SKEWNESS_REDUCE = new
SqlFunction("SKEWNESS_REDUCE", SqlKind.OTHER_FUNCTION,
+ ReturnTypes.DOUBLE, null, OperandTypes.BINARY,
SqlFunctionCategory.USER_DEFINED_FUNCTION);
+ public static final SqlFunction KURTOSIS_REDUCE = new
SqlFunction("KURTOSIS_REDUCE", SqlKind.OTHER_FUNCTION,
+ ReturnTypes.DOUBLE, null, OperandTypes.BINARY,
SqlFunctionCategory.USER_DEFINED_FUNCTION);
+
+ public static final SqlAggFunction BOOL_AND =
PinotBoolAndAggregateFunction.INSTANCE;
+ public static final SqlAggFunction BOOL_OR =
PinotBoolOrAggregateFunction.INSTANCE;
+ public static final SqlAggFunction SKEWNESS =
PinotSkewnessAggregateFunction.INSTANCE;
+ public static final SqlAggFunction KURTOSIS =
PinotKurtosisAggregateFunction.INSTANCE;
// TODO: clean up lazy init by using
Suppliers.memorized(this::computeInstance) and make getter wrapped around
// supplier instance. this should replace all lazy init static objects in
the codebase
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSkewnessAggregateFunction.java
similarity index 74%
copy from
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
copy to
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSkewnessAggregateFunction.java
index 3610ce0c4d..16857e8568 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSkewnessAggregateFunction.java
@@ -27,11 +27,14 @@ import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.util.Optionality;
-public class PinotBoolAndAggregateFunction extends SqlAggFunction {
+public class PinotSkewnessAggregateFunction extends SqlAggFunction {
- public PinotBoolAndAggregateFunction() {
- super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
- null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ public static final String SKEWNESS = "SKEWNESS";
+ public static final PinotSkewnessAggregateFunction INSTANCE = new
PinotSkewnessAggregateFunction();
+
+ public PinotSkewnessAggregateFunction() {
+ super(SKEWNESS, null, SqlKind.OTHER_FUNCTION, ReturnTypes.DOUBLE,
+ null, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION,
false, false, Optionality.FORBIDDEN);
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index 72976185aa..80218c6442 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -165,6 +165,8 @@ public final class RelToStageConverter {
case CHAR:
case VARCHAR:
return DataSchema.ColumnDataType.STRING;
+ case OTHER:
+ return DataSchema.ColumnDataType.OBJECT;
case BINARY:
case VARBINARY:
return DataSchema.ColumnDataType.BYTES;
@@ -174,7 +176,11 @@ public final class RelToStageConverter {
}
public static FieldSpec.DataType convertToFieldSpecDataType(RelDataType
relDataType) {
- return convertToColumnDataType(relDataType).toDataType();
+ DataSchema.ColumnDataType columnDataType =
convertToColumnDataType(relDataType);
+ if (columnDataType == DataSchema.ColumnDataType.OBJECT) {
+ return FieldSpec.DataType.BYTES;
+ }
+ return columnDataType.toDataType();
}
public static PinotDataType convertToPinotDataType(RelDataType relDataType) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 109764bf02..cad627b925 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -31,6 +31,7 @@ import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.BlockDocIdValueSet;
import org.apache.pinot.core.common.BlockMetadata;
import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.common.datablock.DataBlockBuilder;
@@ -87,7 +88,7 @@ public class TransferableBlock implements Block {
if (_container == null) {
switch (_type) {
case ROW:
- _container = DataBlockUtils.extractRows(_dataBlock);
+ _container = DataBlockUtils.extractRows(_dataBlock,
ObjectSerDeUtils::deserialize);
break;
case COLUMNAR:
case METADATA:
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index a82048949f..3182dba91f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -26,7 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.DataBlock;
@@ -35,6 +35,7 @@ import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.spi.data.FieldSpec;
@@ -73,13 +74,14 @@ public class AggregateOperator extends MultiStageOperator {
// groupSet has to be a list of InputRef and cannot be null
// TODO: Add these two checks when we confirm we can handle error in
upstream ctor call.
public AggregateOperator(MultiStageOperator inputOperator, DataSchema
dataSchema,
- List<RexExpression> aggCalls, List<RexExpression> groupSet) {
- this(inputOperator, dataSchema, aggCalls, groupSet,
AggregateOperator.Accumulator.MERGERS);
+ List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema
inputSchema) {
+ this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema,
AggregateOperator.Accumulator.MERGERS);
}
@VisibleForTesting
AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema,
- List<RexExpression> aggCalls, List<RexExpression> groupSet, Map<String,
Merger> mergers) {
+ List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema
inputSchema, Map<String,
+ Function<DataSchema.ColumnDataType, Merger>> mergers) {
_inputOperator = inputOperator;
_groupSet = groupSet;
_upstreamErrorBlock = null;
@@ -96,7 +98,7 @@ public class AggregateOperator extends MultiStageOperator {
if (!mergers.containsKey(functionName)) {
throw new IllegalStateException("Unexpected value: " + functionName);
}
- _accumulators[i] = new Accumulator(agg, mergers.get(functionName));
+ _accumulators[i] = new Accumulator(agg, mergers, functionName,
inputSchema);
}
_groupByKeyHolder = new HashMap<>();
@@ -184,6 +186,14 @@ public class AggregateOperator extends MultiStageOperator {
return false;
}
+ private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
+ Object[] keyElements = new Object[groupSet.size()];
+ for (int i = 0; i < groupSet.size(); i++) {
+ keyElements[i] = row[((RexExpression.InputRef)
groupSet.get(i)).getIndex()];
+ }
+ return new Key(keyElements);
+ }
+
private static Object mergeSum(Object left, Object right) {
return ((Number) left).doubleValue() + ((Number) right).doubleValue();
}
@@ -209,37 +219,81 @@ public class AggregateOperator extends MultiStageOperator
{
return ((Boolean) left) || ((Boolean) right);
}
- private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
- Object[] keyElements = new Object[groupSet.size()];
- for (int i = 0; i < groupSet.size(); i++) {
- keyElements[i] = row[((RexExpression.InputRef)
groupSet.get(i)).getIndex()];
+ // NOTE: the below two classes are needed depending on where the
+ // fourth moment is being executed - if the leaf stage gets a
+ // fourth moment pushed down to it, it will return a PinotFourthMoment
+ // as the result of the aggregation. If it is not possible (e.g. the
+ // input to the aggregate requires the result of a JOIN - such as
+ // FOURTHMOMENT(t1.a + t2.a)) then the input to the aggregate in the
+ // intermediate stage is a numeric.
+
+ private static class MergeFourthMomentNumeric implements Merger {
+
+ @Override
+ public Object merge(Object left, Object right) {
+ ((PinotFourthMoment) left).increment(((Number) right).doubleValue());
+ return left;
+ }
+
+ @Override
+ public Object initialize(Object other) {
+ PinotFourthMoment moment = new PinotFourthMoment();
+ moment.increment(((Number) other).doubleValue());
+ return moment;
}
- return new Key(keyElements);
}
- interface Merger extends BiFunction<Object, Object, Object> {
+ private static class MergeFourthMomentObject implements Merger {
+
+ @Override
+ public Object merge(Object left, Object right) {
+ PinotFourthMoment agg = (PinotFourthMoment) left;
+ agg.combine((PinotFourthMoment) right);
+ return agg;
+ }
+ }
+
+ interface Merger {
+ /**
+ * Initializes the merger based on the first input
+ */
+ default Object initialize(Object other) {
+ return other;
+ }
+
+ /**
+ * Merges the existing aggregate (the result of {@link
#initialize(Object)}) with
+ * the new value coming in (which may be an aggregate in and of itself).
+ */
+ Object merge(Object agg, Object value);
}
private static class Accumulator {
- private static final Map<String, Merger> MERGERS = ImmutableMap
- .<String, Merger>builder()
- .put("SUM", AggregateOperator::mergeSum)
- .put("$SUM", AggregateOperator::mergeSum)
- .put("$SUM0", AggregateOperator::mergeSum)
- .put("MIN", AggregateOperator::mergeMin)
- .put("$MIN", AggregateOperator::mergeMin)
- .put("$MIN0", AggregateOperator::mergeMin)
- .put("MAX", AggregateOperator::mergeMax)
- .put("$MAX", AggregateOperator::mergeMax)
- .put("$MAX0", AggregateOperator::mergeMax)
- .put("COUNT", AggregateOperator::mergeCount)
- .put("BOOL_AND", AggregateOperator::mergeBoolAnd)
- .put("$BOOL_AND", AggregateOperator::mergeBoolAnd)
- .put("$BOOL_AND0", AggregateOperator::mergeBoolAnd)
- .put("BOOL_OR", AggregateOperator::mergeBoolOr)
- .put("$BOOL_OR", AggregateOperator::mergeBoolOr)
- .put("$BOOL_OR0", AggregateOperator::mergeBoolOr)
+ private static final Map<String, Function<DataSchema.ColumnDataType,
Merger>> MERGERS = ImmutableMap
+ .<String, Function<DataSchema.ColumnDataType, Merger>>builder()
+ .put("SUM", cdt -> AggregateOperator::mergeSum)
+ .put("$SUM", cdt -> AggregateOperator::mergeSum)
+ .put("$SUM0", cdt -> AggregateOperator::mergeSum)
+ .put("MIN", cdt -> AggregateOperator::mergeMin)
+ .put("$MIN", cdt -> AggregateOperator::mergeMin)
+ .put("$MIN0", cdt -> AggregateOperator::mergeMin)
+ .put("MAX", cdt -> AggregateOperator::mergeMax)
+ .put("$MAX", cdt -> AggregateOperator::mergeMax)
+ .put("$MAX0", cdt -> AggregateOperator::mergeMax)
+ .put("COUNT", cdt -> AggregateOperator::mergeCount)
+ .put("BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
+ .put("$BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
+ .put("$BOOL_AND0", cdt -> AggregateOperator::mergeBoolAnd)
+ .put("BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
+ .put("$BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
+ .put("$BOOL_OR0", cdt -> AggregateOperator::mergeBoolOr)
+ .put("FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
+ ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
+ .put("$FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
+ ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
+ .put("$FOURTHMOMENT0", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
+ ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
.build();
final int _inputRef;
@@ -247,17 +301,21 @@ public class AggregateOperator extends MultiStageOperator
{
final Map<Key, Object> _results = new HashMap<>();
final Merger _merger;
- Accumulator(RexExpression.FunctionCall aggCall, Merger merger) {
- _merger = merger;
+ Accumulator(RexExpression.FunctionCall aggCall, Map<String,
Function<DataSchema.ColumnDataType, Merger>> merger,
+ String functionName, DataSchema inputSchema) {
// agg function operand should either be a InputRef or a Literal
+ DataSchema.ColumnDataType dataType;
RexExpression rexExpression = toAggregationFunctionOperand(aggCall);
if (rexExpression instanceof RexExpression.InputRef) {
_inputRef = ((RexExpression.InputRef) rexExpression).getIndex();
_literal = null;
+ dataType = inputSchema.getColumnDataType(_inputRef);
} else {
_inputRef = -1;
_literal = ((RexExpression.Literal) rexExpression).getValue();
+ dataType =
DataSchema.ColumnDataType.fromDataType(rexExpression.getDataType(), false);
}
+ _merger = merger.get(functionName).apply(dataType);
}
void accumulate(Key key, Object[] row) {
@@ -268,9 +326,9 @@ public class AggregateOperator extends MultiStageOperator {
Object value = _inputRef == -1 ? _literal : row[_inputRef];
if (currentRes == null) {
- keys.put(key, value);
+ keys.put(key, _merger.initialize(value));
} else {
- Object mergedResult = _merger.apply(currentRes, value);
+ Object mergedResult = _merger.merge(currentRes, value);
_results.put(key, mergedResult);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 365d04f863..e794a84194 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -282,7 +282,11 @@ public class LeafStageTransferableBlockOperator extends
MultiStageOperator {
for (int colId = 0; colId < row.length; colId++) {
Object value = row[colId];
if (value != null) {
- resultRow[colId] = dataSchema.getColumnDataType(colId).convert(value);
+ if (dataSchema.getColumnDataType(colId) ==
DataSchema.ColumnDataType.OBJECT) {
+ resultRow[colId] = value;
+ } else {
+ resultRow[colId] =
dataSchema.getColumnDataType(colId).convert(value);
+ }
}
}
return resultRow;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 57a0949bc6..aaa140463f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -84,7 +84,7 @@ public class PhysicalPlanVisitor implements
StageNodeVisitor<MultiStageOperator,
public MultiStageOperator visitAggregate(AggregateNode node,
PlanRequestContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
return new AggregateOperator(nextOperator, node.getDataSchema(),
node.getAggCalls(),
- node.getGroupSet());
+ node.getGroupSet(), node.getInputs().get(0).getDataSchema());
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index e29acb7206..25b7aa5e8b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.QueryPlan;
@@ -147,7 +148,7 @@ public class QueryDispatcher {
for (int colId = 0; colId < numColumns; colId++) {
nullBitmaps[colId] = dataBlock.getNullRowIds(colId);
}
- List<Object[]> rawRows = DataBlockUtils.extractRows(dataBlock);
+ List<Object[]> rawRows = DataBlockUtils.extractRows(dataBlock,
ObjectSerDeUtils::deserialize);
int rowId = 0;
for (Object[] rawRow : rawRows) {
Object[] row = new Object[numColumns];
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 0996b7f4d0..aa7394a6a8 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -69,8 +69,9 @@ public class AggregateOperatorTest {
Mockito.when(_input.nextBlock())
.thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new
Exception("foo!")));
+ DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
// When:
TransferableBlock block1 = operator.nextBlock(); // build
@@ -89,8 +90,9 @@ public class AggregateOperatorTest {
Mockito.when(_input.nextBlock())
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
// When:
TransferableBlock block = operator.nextBlock();
@@ -113,7 +115,7 @@ public class AggregateOperatorTest {
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
// When:
TransferableBlock block1 = operator.nextBlock(); // build when reading
NoOp block
@@ -137,7 +139,7 @@ public class AggregateOperatorTest {
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
// When:
TransferableBlock block1 = operator.nextBlock();
@@ -163,7 +165,7 @@ public class AggregateOperatorTest {
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
// When:
TransferableBlock block1 = operator.nextBlock();
@@ -191,10 +193,11 @@ public class AggregateOperatorTest {
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
AggregateOperator.Merger merger =
Mockito.mock(AggregateOperator.Merger.class);
- Mockito.when(merger.apply(Mockito.any(), Mockito.any())).thenReturn(12d);
+ Mockito.when(merger.merge(Mockito.any(), Mockito.any())).thenReturn(12d);
+ Mockito.when(merger.initialize(Mockito.any())).thenReturn(1d);
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, ImmutableMap.of(
- "SUM", merger
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema, ImmutableMap.of(
+ "SUM", cdt -> merger
));
// When:
@@ -203,7 +206,8 @@ public class AggregateOperatorTest {
// Then:
// should call merger twice, one from second row in first block and two
from the first row
// in second block
- Mockito.verify(merger, Mockito.times(2)).apply(Mockito.any(),
Mockito.any());
+ Mockito.verify(merger, Mockito.times(1)).initialize(Mockito.any());
+ Mockito.verify(merger, Mockito.times(2)).merge(Mockito.any(),
Mockito.any());
Assert.assertEquals(resultBlock.getContainer().get(0), new Object[]{1,
12d},
"Expected two columns (group by key, agg value)");
}
@@ -213,9 +217,10 @@ public class AggregateOperatorTest {
MultiStageOperator upstreamOperator =
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
// Create an aggregation call with sum for first column and group by
second column.
RexExpression.FunctionCall agg = getSum(new RexExpression.InputRef(0));
+ DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
AggregateOperator sum0GroupBy1 =
new AggregateOperator(upstreamOperator,
OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1),
- Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)));
+ Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)),
inSchema);
TransferableBlock result = sum0GroupBy1.getNextBlock();
while (result.isNoOpBlock()) {
result = sum0GroupBy1.getNextBlock();
@@ -237,9 +242,10 @@ public class AggregateOperatorTest {
);
List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
DataSchema outSchema = new DataSchema(new String[]{"unknown"}, new
ColumnDataType[]{DOUBLE});
+ DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new
ColumnDataType[]{DOUBLE});
// When:
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
}
@Test
@@ -256,7 +262,7 @@ public class AggregateOperatorTest {
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
// When:
TransferableBlock block = operator.nextBlock();
diff --git a/pinot-query-runtime/src/test/resources/queries/Skew.json
b/pinot-query-runtime/src/test/resources/queries/Skew.json
new file mode 100644
index 0000000000..7f16a6970e
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/Skew.json
@@ -0,0 +1,86 @@
+{
+ "skew": {
+ "tables": {
+ "tbl": {
+ "schema": [
+ {"name": "groupingCol", "type": "STRING"},
+ {"name": "partitionCol", "type": "STRING"},
+ {"name": "val", "type": "INT"}
+ ],
+ "inputs": [
+ ["a", "key1", 1],
+ ["a", "key2", 2],
+ ["a", "key3", 3],
+ ["a", "key1", 4],
+ ["a", "key2", 4],
+ ["a", "key3", 4],
+ ["a", "key1", 7],
+ ["a", "key2", 9],
+ ["b", "key3", 1],
+ ["b", "key1", 2],
+ ["b", "key2", 3],
+ ["b", "key3", 4],
+ ["b", "key1", 4],
+ ["b", "key2", 4],
+ ["b", "key3", 7],
+ ["b", "key1", 9]
+ ],
+ "partitionColumns": [
+ "partitionCol"
+ ]
+ },
+ "tbl2": {
+ "schema": [
+ {"name": "groupingCol", "type": "STRING"},
+ {"name": "partitionCol", "type": "STRING"},
+ {"name": "val", "type": "INT"}
+ ],
+ "inputs": [
+ ["a", "key1", 1],
+ ["a", "key2", 2],
+ ["a", "key3", 3],
+ ["a", "key1", 4],
+ ["a", "key2", 4],
+ ["a", "key3", 4],
+ ["a", "key1", 7],
+ ["a", "key2", 9],
+ ["b", "key3", 1],
+ ["b", "key1", 2],
+ ["b", "key2", 3],
+ ["b", "key3", 4],
+ ["b", "key1", 4],
+ ["b", "key2", 4],
+ ["b", "key3", 7],
+ ["b", "key1", 9]
+ ],
+ "partitionColumns": [
+ "partitionCol"
+ ]
+ }
+ },
+ "queries": [
+ {
+ "description": "skew for int column",
+ "sql": "SELECT groupingCol, SKEWNESS(val), KURTOSIS(val) FROM {tbl}
GROUP BY groupingCol",
+ "outputs": [
+ ["a", 0.8647536091225356, 0.3561662049861511],
+ ["b", 0.8647536091225356, 0.3561662049861511]
+ ]
+ },
+ {
+ "description": "no group by clause",
+ "sql": "SELECT SKEWNESS(val), KURTOSIS(val) FROM {tbl} WHERE
groupingCol='a'",
+ "outputs": [
+ [0.8647536091225356, 0.3561662049861511]
+ ]
+ },
+ {
+ "sql": "SELECT t1.groupingCol, SKEWNESS(t1.val + t2.val),
KURTOSIS(t1.val + t2.val) FROM {tbl} AS t1 LEFT JOIN {tbl2} AS t2 USING
(partitionCol) GROUP BY t1.groupingCol",
+ "outputs": [
+ ["a", 0.5412443772804422, -0.001438580062540293],
+ ["b", 0.5412443772804422, -0.001438580062540293]
+ ]
+ }
+ ]
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 8f53122bd4..7c355d0057 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -67,6 +67,7 @@ public enum AggregationFunctionType {
STDDEVSAMP("stdDevSamp"),
SKEWNESS("skewness"),
KURTOSIS("kurtosis"),
+ FOURTHMOMENT("fourthmoment"),
// Geo aggregation functions
STUNION("STUnion"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]