This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7b8c8a0fe8 [multistage] [testing] Filter Operator Unit Test (#9792)
7b8c8a0fe8 is described below
commit 7b8c8a0fe8fca07d0cad66d0bfa4d02d31dc74c6
Author: Yao Liu <[email protected]>
AuthorDate: Tue Nov 15 11:39:33 2022 -0800
[multistage] [testing] Filter Operator Unit Test (#9792)
---
.../pinot/query/planner/logical/RexExpression.java | 6 +
.../pinot/query/planner/logical/StagePlanner.java | 2 +-
.../query/runtime/operator/FilterOperator.java | 14 +-
.../runtime/operator/operands/FilterOperand.java | 52 ++--
.../query/runtime/operator/FilterOperatorTest.java | 288 +++++++++++++++++++++
.../query/runtime/operator/OperatorTestUtil.java | 5 +
6 files changed, 345 insertions(+), 22 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
index 3e964e77d0..e9a4a99679 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
@@ -203,12 +203,18 @@ public interface RexExpression {
}
class FunctionCall implements RexExpression {
+ // the underlying SQL operator kind of this function.
+ // It can be either a standard SQL operator or an extended function kind.
+ // @see #SqlKind.FUNCTION, #SqlKind.OTHER, #SqlKind.OTHER_FUNCTION
@ProtoProperties
private SqlKind _sqlKind;
+ // the return data type of the function.
@ProtoProperties
private FieldSpec.DataType _dataType;
+ // the name of the SQL function. For standard SqlKind it should match the
SqlKind ENUM name.
@ProtoProperties
private String _functionName;
+ // the list of RexExpressions that represents the operands to the function.
@ProtoProperties
private List<RexExpression> _functionOperands;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 323e7f506b..2d61856c85 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -41,7 +41,7 @@ import org.apache.pinot.query.routing.WorkerManager;
* This class is non-threadsafe. Do not reuse the stage planner for multiple
query plans.
*/
public class StagePlanner {
- private final PlannerContext _plannerContext;
+ private final PlannerContext _plannerContext; // DO NOT REMOVE.
private final WorkerManager _workerManager;
private int _stageIdCounter;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index f639020b25..66956a95ec 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -30,7 +30,19 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.FilterOperand;
-
+/*
+ FilterOperator apply filter on rows from upstreamOperator.
+ There are three types of filter operands
+ 1) inputRef
+ 2) Literal
+ 3) FunctionOperand
+ All three types' result has to be a boolean to be used to filter rows.
+ FunctionOperand supports,
+ 1) AND, OR, NOT functions to combine operands.
+ 2) Binary Operand: equals, notEquals, greaterThan, greaterThanOrEqual,
lessThan, lessThanOrEqual
+ 3) All boolean scalar functions we have that take tranformOperand.
+ Note: Scalar functions are the ones we have in v1 engine and only do
function name and arg # matching.
+ */
public class FilterOperator extends BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "FILTER";
private final Operator<TransferableBlock> _upstreamOperator;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
index 3c58d1710a..b152b33b88 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
@@ -41,69 +41,73 @@ public abstract class FilterOperand extends
TransformOperand {
}
}
- public static FilterOperand toFilterOperand(RexExpression.Literal literal) {
+ private static FilterOperand toFilterOperand(RexExpression.Literal literal) {
return new BooleanLiteral(literal);
}
- public static FilterOperand toFilterOperand(RexExpression.InputRef inputRef,
DataSchema dataSchema) {
+ private static FilterOperand toFilterOperand(RexExpression.InputRef
inputRef, DataSchema dataSchema) {
return new BooleanInputRef(inputRef, dataSchema);
}
- public static FilterOperand toFilterOperand(RexExpression.FunctionCall
functionCall, DataSchema dataSchema) {
-
+ private static FilterOperand toFilterOperand(RexExpression.FunctionCall
functionCall, DataSchema dataSchema) {
+ int operandSize = functionCall.getFunctionOperands().size();
+ // TODO: Move these functions out of this class.
switch
(OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName())) {
case "AND":
+ Preconditions.checkState(operandSize >= 2, "AND takes >=2 argument,
passed in argument size:" + operandSize);
return new And(functionCall.getFunctionOperands(), dataSchema);
case "OR":
+ Preconditions.checkState(operandSize >= 2, "OR takes >=2 argument,
passed in argument size:" + operandSize);
return new Or(functionCall.getFunctionOperands(), dataSchema);
case "NOT":
+ Preconditions.checkState(operandSize == 1, "NOT takes one argument,
passed in argument size:" + operandSize);
return new
Not(toFilterOperand(functionCall.getFunctionOperands().get(0), dataSchema));
case "equals":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) == 0;
+ return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ == 0;
}
};
case "notEquals":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) != 0;
+ return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ != 0;
}
};
case "greaterThan":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) > 0;
+ return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ > 0;
}
};
case "greaterThanOrEqual":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) >= 0;
+ return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ >= 0;
}
};
case "lessThan":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) < 0;
+ return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ < 0;
}
};
case "lessThanOrEqual":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) <= 0;
+ return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ <= 0;
}
};
default:
@@ -119,7 +123,8 @@ public abstract class FilterOperand extends
TransformOperand {
public BooleanFunction(RexExpression.FunctionCall functionCall, DataSchema
dataSchema) {
FunctionOperand func = (FunctionOperand)
TransformOperand.toTransformOperand(functionCall, dataSchema);
- Preconditions.checkState(func.getResultType() ==
DataSchema.ColumnDataType.BOOLEAN);
+ Preconditions.checkState(func.getResultType() ==
DataSchema.ColumnDataType.BOOLEAN,
+ "Expecting boolean result type but got type:" +
func.getResultType());
_func = func;
}
@@ -133,8 +138,9 @@ public abstract class FilterOperand extends
TransformOperand {
private final RexExpression.InputRef _inputRef;
public BooleanInputRef(RexExpression.InputRef inputRef, DataSchema
dataSchema) {
-
Preconditions.checkState(dataSchema.getColumnDataType(inputRef.getIndex())
- == DataSchema.ColumnDataType.BOOLEAN);
+ DataSchema.ColumnDataType inputType =
dataSchema.getColumnDataType(inputRef.getIndex());
+ Preconditions.checkState(inputType == DataSchema.ColumnDataType.BOOLEAN,
+ "Input has to be boolean type but got type:" + inputType);
_inputRef = inputRef;
}
@@ -148,7 +154,8 @@ public abstract class FilterOperand extends
TransformOperand {
private final Object _literalValue;
public BooleanLiteral(RexExpression.Literal literal) {
- Preconditions.checkState(literal.getDataType() ==
FieldSpec.DataType.BOOLEAN);
+ Preconditions.checkState(literal.getDataType() ==
FieldSpec.DataType.BOOLEAN,
+ "Only boolean literal is supported as filter, but got type:" +
literal.getDataType());
_literalValue = literal.getValue();
}
@@ -160,6 +167,7 @@ public abstract class FilterOperand extends
TransformOperand {
private static class And extends FilterOperand {
List<FilterOperand> _childOperands;
+
public And(List<RexExpression> childExprs, DataSchema dataSchema) {
_childOperands = new ArrayList<>(childExprs.size());
for (RexExpression childExpr : childExprs) {
@@ -180,6 +188,7 @@ public abstract class FilterOperand extends
TransformOperand {
private static class Or extends FilterOperand {
List<FilterOperand> _childOperands;
+
public Or(List<RexExpression> childExprs, DataSchema dataSchema) {
_childOperands = new ArrayList<>(childExprs.size());
for (RexExpression childExpr : childExprs) {
@@ -200,6 +209,7 @@ public abstract class FilterOperand extends
TransformOperand {
private static class Not extends FilterOperand {
FilterOperand _childOperand;
+
public Not(FilterOperand childOperand) {
_childOperand = childOperand;
}
@@ -216,6 +226,8 @@ public abstract class FilterOperand extends
TransformOperand {
protected final DataSchema.ColumnDataType _resultType;
public Predicate(List<RexExpression> functionOperands, DataSchema
dataSchema) {
+ Preconditions.checkState(functionOperands.size() == 2,
+ "Expected 2 function ops for Predicate but got:" +
functionOperands.size());
_lhs = TransformOperand.toTransformOperand(functionOperands.get(0),
dataSchema);
_rhs = TransformOperand.toTransformOperand(functionOperands.get(1),
dataSchema);
if (_lhs._resultType != null && _lhs._resultType !=
DataSchema.ColumnDataType.OBJECT) {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
new file mode 100644
index 0000000000..7b1f5bce5f
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class FilterOperatorTest {
+ private AutoCloseable _mocks;
+ @Mock
+ private Operator<TransferableBlock> _upstreamOperator;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void shouldPropagateUpstreamErrorBlock() {
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new
Exception("filterError")));
+ RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+ DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.BOOLEAN
+ });
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ TransferableBlock errorBlock = op.getNextBlock();
+ Assert.assertTrue(errorBlock.isErrorBlock());
+ DataBlock error = errorBlock.getDataBlock();
+
Assert.assertTrue(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("filterError"));
+ }
+
+ @Test
+ public void shouldPropagateUpstreamEOS() {
+ RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+
Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertTrue(dataBlock.isEndOfStreamBlock());
+ }
+
+ @Test
+ public void shouldPropagateUpstreamNoop() {
+ RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+
Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertTrue(dataBlock.isNoOpBlock());
+ }
+
+ @Test
+ public void shouldHandleTrueBooleanLiteralFilter() {
+ RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new
Object[]{1}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 2);
+ Assert.assertEquals(result.get(0)[0], 0);
+ Assert.assertEquals(result.get(1)[0], 1);
+ }
+
+ @Test
+ public void shouldHandleFalseBooleanLiteralFilter() {
+ RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, false);
+
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new
Object[]{2}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertTrue(result.isEmpty());
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*boolean literal.*")
+ public void shouldThrowOnNonBooleanTypeBooleanLiteral() {
+ RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.STRING, "false");
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new
Object[]{2}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*Input has to be "
+ + "boolean type.*")
+ public void shouldThrowOnNonBooleanTypeInputRef() {
+ RexExpression ref0 = new RexExpression.InputRef(0);
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new
Object[]{2}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
ref0);
+ }
+
+ @Test
+ public void shouldHandleBooleanInputRef() {
+ RexExpression ref1 = new RexExpression.InputRef(1);
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol", "boolCol"},
new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.BOOLEAN
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true},
new Object[]{2, false}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
ref1);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0)[0], 1);
+ Assert.assertEquals(result.get(0)[1], true);
+ }
+
+ @Test
+ public void shouldHandleAndFilter() {
+ DataSchema inputSchema = new DataSchema(new String[]{"boolCol0",
"boolCol1"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN
+ });
+ Mockito.when(_upstreamOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new
Object[]{false, false},
+ new Object[]{true, false}));
+ RexExpression.FunctionCall andCall = new
RexExpression.FunctionCall(SqlKind.AND, FieldSpec.DataType.BOOLEAN, "AND",
+ ImmutableList.of(new RexExpression.InputRef(0), new
RexExpression.InputRef(1)));
+
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
andCall);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0)[0], true);
+ Assert.assertEquals(result.get(0)[1], true);
+ }
+
+ @Test
+ public void shouldHandleOrFilter() {
+ DataSchema inputSchema = new DataSchema(new String[]{"boolCol0",
"boolCol1"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN
+ });
+ Mockito.when(_upstreamOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new
Object[]{false, false},
+ new Object[]{true, false}));
+ RexExpression.FunctionCall orCall = new
RexExpression.FunctionCall(SqlKind.OR, FieldSpec.DataType.BOOLEAN, "OR",
+ ImmutableList.of(new RexExpression.InputRef(0), new
RexExpression.InputRef(1)));
+
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
orCall);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 2);
+ Assert.assertEquals(result.get(0)[0], true);
+ Assert.assertEquals(result.get(0)[1], true);
+ Assert.assertEquals(result.get(1)[0], true);
+ Assert.assertEquals(result.get(1)[1], false);
+ }
+
+ @Test
+ public void shouldHandleNotFilter() {
+ DataSchema inputSchema = new DataSchema(new String[]{"boolCol0",
"boolCol1"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN
+ });
+ Mockito.when(_upstreamOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new
Object[]{false, false},
+ new Object[]{true, false}));
+ RexExpression.FunctionCall notCall = new
RexExpression.FunctionCall(SqlKind.NOT, FieldSpec.DataType.BOOLEAN, "NOT",
+ ImmutableList.of(new RexExpression.InputRef(0)));
+
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
notCall);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0)[0], false);
+ Assert.assertEquals(result.get(0)[1], false);
+ }
+
+ @Test
+ public void shouldHandleGreaterThanFilter() {
+ DataSchema inputSchema = new DataSchema(new String[]{"int0", "int1"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, 2},
new Object[]{3, 2}, new Object[]{1, 1}));
+ RexExpression.FunctionCall greaterThan =
+ new RexExpression.FunctionCall(SqlKind.GREATER_THAN,
FieldSpec.DataType.BOOLEAN, "greaterThan",
+ ImmutableList.of(new RexExpression.InputRef(0), new
RexExpression.InputRef(1)));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
greaterThan);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ List<Object[]> expectedResult = ImmutableList.of(new Object[]{3, 2});
+ Assert.assertEquals(result.size(), expectedResult.size());
+ Assert.assertEquals(result.get(0), expectedResult.get(0));
+ }
+
+ @Test
+ public void shouldHandleBooleanFunction() {
+ DataSchema inputSchema = new DataSchema(new String[]{"string1"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new
Object[]{"starTree"}, new Object[]{"treeStar"}));
+ RexExpression.FunctionCall startsWith =
+ new RexExpression.FunctionCall(SqlKind.OTHER,
FieldSpec.DataType.BOOLEAN, "startsWith",
+ ImmutableList.of(new RexExpression.InputRef(0),
+ new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
startsWith);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ List<Object[]> expectedResult = ImmutableList.of(new Object[]{"starTree"});
+ Assert.assertEquals(result.size(), expectedResult.size());
+ Assert.assertEquals(result.get(0), expectedResult.get(0));
+ }
+
+ @Test(expectedExceptions = NullPointerException.class,
expectedExceptionsMessageRegExp = ".*Cannot find function "
+ + "with Name: startsWithError.*")
+ public void shouldThrowOnUnfoundFunction() {
+ DataSchema inputSchema = new DataSchema(new String[]{"string1"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new
Object[]{"starTree"}, new Object[]{"treeStar"}));
+ RexExpression.FunctionCall startsWith =
+ new RexExpression.FunctionCall(SqlKind.OTHER,
FieldSpec.DataType.BOOLEAN, "startsWithError",
+ ImmutableList.of(new RexExpression.InputRef(0),
+ new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
startsWith);
+ }
+}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 86b65a1686..0537c67ca9 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator;
import java.util.Arrays;
import java.util.List;
+import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -57,4 +58,8 @@ public class OperatorTestUtil {
public static DataSchema getDataSchema(String operatorName) {
return MOCK_OPERATOR_FACTORY.getDataSchema(operatorName);
}
+
+ public static TransferableBlock block(DataSchema schema, Object[]... rows) {
+ return new TransferableBlock(Arrays.asList(rows), schema,
DataBlock.Type.ROW);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]