This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d5740856f4 Support conjugates for scalar functions, add more scalar
functions (#8582)
d5740856f4 is described below
commit d5740856f481c34808b7027c0fd1b6bff068d04d
Author: Saurabh Dubey <[email protected]>
AuthorDate: Wed May 4 23:58:58 2022 +0530
Support conjugates for scalar functions, add more scalar functions (#8582)
This PR adds support for being able to use complex scalar function
expressions for fitlterConfig. In addition to common comparison functions,
support for AND, OR and NOT has been added
---
.../function/scalar/ComparisonFunctions.java | 65 +++++++++
.../common/function/scalar/ObjectFunctions.java | 37 ++++++
.../local/function/InbuiltFunctionEvaluator.java | 109 ++++++++++++++--
.../recordtransformer/RecordTransformerTest.java | 145 +++++++++++++++++++++
4 files changed, 346 insertions(+), 10 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ComparisonFunctions.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ComparisonFunctions.java
new file mode 100644
index 0000000000..781c949e6e
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ComparisonFunctions.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.function.scalar;
+
+import org.apache.pinot.spi.annotations.ScalarFunction;
+
+public class ComparisonFunctions {
+
+ private static final double DOUBLE_COMPARISON_TOLERANCE = 1e-7d;
+
+ private ComparisonFunctions() {
+ }
+
+ @ScalarFunction
+ public static boolean greaterThan(double a, double b) {
+ return a > b;
+ }
+
+ @ScalarFunction
+ public static boolean greaterThanOrEqual(double a, double b) {
+ return a >= b;
+ }
+
+ @ScalarFunction
+ public static boolean lessThan(double a, double b) {
+ return a < b;
+ }
+
+ @ScalarFunction
+ public static boolean lessThanOrEqual(double a, double b) {
+ return a <= b;
+ }
+
+ @ScalarFunction
+ public static boolean notEquals(double a, double b) {
+ return Math.abs(a - b) >= DOUBLE_COMPARISON_TOLERANCE;
+ }
+
+ @ScalarFunction
+ public static boolean equals(double a, double b) {
+ // To avoid approximation errors
+ return Math.abs(a - b) < DOUBLE_COMPARISON_TOLERANCE;
+ }
+
+ @ScalarFunction
+ public static boolean between(double val, double a, double b) {
+ return val > a && val < b;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ObjectFunctions.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ObjectFunctions.java
new file mode 100644
index 0000000000..2c719a4cfb
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ObjectFunctions.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.function.scalar;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.annotations.ScalarFunction;
+
+public class ObjectFunctions {
+ private ObjectFunctions() {
+ }
+
+ @ScalarFunction(nullableParameters = true)
+ public static boolean isNull(@Nullable Object obj) {
+ return obj == null;
+ }
+
+ @ScalarFunction(nullableParameters = true)
+ public static boolean isNotNull(@Nullable Object obj) {
+ return !isNull(obj);
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java
index a894e010c6..18e1ee5031 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.function;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
@@ -68,17 +69,27 @@ public class InbuiltFunctionEvaluator implements
FunctionEvaluator {
childNodes[i] = planExecution(arguments.get(i));
}
String functionName = function.getFunctionName();
- FunctionInfo functionInfo =
FunctionRegistry.getFunctionInfo(functionName, numArguments);
- if (functionInfo == null) {
- if (FunctionRegistry.containsFunction(functionName)) {
- throw new IllegalStateException(
- String.format("Unsupported function: %s with %d parameters",
functionName, numArguments));
- } else {
- throw new IllegalStateException(
- String.format("Unsupported function: %s not found",
functionName));
- }
+ switch (functionName) {
+ case "and":
+ return new AndExecutionNode(childNodes);
+ case "or":
+ return new OrExecutionNode(childNodes);
+ case "not":
+ Preconditions.checkState(numArguments == 1, "NOT function expects
1 argument, got: %s", numArguments);
+ return new NotExecutionNode(childNodes[0]);
+ default:
+ FunctionInfo functionInfo =
FunctionRegistry.getFunctionInfo(functionName, numArguments);
+ if (functionInfo == null) {
+ if (FunctionRegistry.containsFunction(functionName)) {
+ throw new IllegalStateException(
+ String.format("Unsupported function: %s with %d
parameters", functionName, numArguments));
+ } else {
+ throw new IllegalStateException(
+ String.format("Unsupported function: %s not found",
functionName));
+ }
+ }
+ return new FunctionExecutionNode(functionInfo, childNodes);
}
- return new FunctionExecutionNode(functionInfo, childNodes);
default:
throw new IllegalStateException();
}
@@ -106,6 +117,84 @@ public class InbuiltFunctionEvaluator implements
FunctionEvaluator {
Object execute(Object[] values);
}
+ private static class NotExecutionNode implements ExecutableNode {
+ private final ExecutableNode _argumentNode;
+
+ NotExecutionNode(ExecutableNode argumentNode) {
+ _argumentNode = argumentNode;
+ }
+
+ @Override
+ public Object execute(GenericRow row) {
+ return !((Boolean) _argumentNode.execute(row));
+ }
+
+ @Override
+ public Object execute(Object[] values) {
+ return !((Boolean) _argumentNode.execute(values));
+ }
+ }
+
+ private static class OrExecutionNode implements ExecutableNode {
+ private final ExecutableNode[] _argumentNodes;
+
+ OrExecutionNode(ExecutableNode[] argumentNodes) {
+ _argumentNodes = argumentNodes;
+ }
+
+ @Override
+ public Object execute(GenericRow row) {
+ for (ExecutableNode executableNode :_argumentNodes) {
+ Boolean res = (Boolean) executableNode.execute(row);
+ if (res) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Object execute(Object[] values) {
+ for (ExecutableNode executableNode :_argumentNodes) {
+ Boolean res = (Boolean) executableNode.execute(values);
+ if (res) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ private static class AndExecutionNode implements ExecutableNode {
+ private final ExecutableNode[] _argumentNodes;
+
+ AndExecutionNode(ExecutableNode[] argumentNodes) {
+ _argumentNodes = argumentNodes;
+ }
+
+ @Override
+ public Object execute(GenericRow row) {
+ for (ExecutableNode executableNode :_argumentNodes) {
+ Boolean res = (Boolean) executableNode.execute(row);
+ if (!res) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Object execute(Object[] values) {
+ for (ExecutableNode executableNode :_argumentNodes) {
+ Boolean res = (Boolean) executableNode.execute(values);
+ if (!res) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
private static class FunctionExecutionNode implements ExecutableNode {
final FunctionInvoker _functionInvoker;
final FunctionInfo _functionInfo;
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
index 195cc0cd7f..f76551c0f4 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
@@ -81,6 +81,7 @@ public class RecordTransformerTest {
record.putValue("svStringWithLengthLimit", "123");
record.putValue("mvString1", new Object[]{"123", 123, 123L, 123f, 123.0});
record.putValue("mvString2", new Object[]{123, 123L, 123f, 123.0, "123"});
+ record.putValue("svNullString", null);
return record;
}
@@ -178,6 +179,150 @@ public class RecordTransformerTest {
}
}
+ @Test
+ public void testScalarOps() {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ // expression true, filtered
+ GenericRow genericRow = getRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null,
+ new FilterConfig("svInt = 123"), null, null));
+ RecordTransformer transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+ // expression true, filtered
+ genericRow = getRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null,
+ new FilterConfig("svDouble > 120"), null, null));
+ transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+ // expression true, filtered
+ genericRow = getRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null,
+ new FilterConfig("svDouble >= 123"), null, null));
+ transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+ // expression true, filtered
+ genericRow = getRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null,
+ new FilterConfig("svDouble < 200"), null, null));
+ transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+ // expression true, filtered
+ genericRow = getRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null,
+ new FilterConfig("svDouble <= 123"), null, null));
+ transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+ // expression true, filtered
+ genericRow = getRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null,
+ new FilterConfig("svLong != 125"), null, null));
+ transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+ // expression true, filtered
+ genericRow = getRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null,
+ new FilterConfig("svLong = 123"), null, null));
+ transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+ // expression true, filtered
+ genericRow = getRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null, new FilterConfig("between(svLong, 100,
125)"), null, null));
+ transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+ }
+
+ private GenericRow getNullColumnsRecord() {
+ GenericRow record = new GenericRow();
+ record.putValue("svNullString", null);
+ record.putValue("svInt", (byte) 123);
+
+ record.putValue("mvLong", Collections.singletonList(123f));
+ record.putValue("mvNullFloat", null);
+ return record;
+ }
+
+ @Test
+ public void testObjectOps() {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ // expression true, filtered
+ GenericRow genericRow = getNullColumnsRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null, new FilterConfig("svNullString is
null"), null, null));
+ RecordTransformer transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+ // expression true, filtered
+ genericRow = getNullColumnsRecord();
+ tableConfig.setIngestionConfig(new IngestionConfig(null, null, new
FilterConfig("svInt is not null"), null, null));
+ transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+ // expression true, filtered
+ genericRow = getNullColumnsRecord();
+ tableConfig.setIngestionConfig(new IngestionConfig(null, null, new
FilterConfig("mvLong is not null"), null, null));
+ transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+ // expression true, filtered
+ genericRow = getNullColumnsRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null, new FilterConfig("mvNullFloat is
null"), null, null));
+ transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+ }
+
+ @Test
+ public void testLogicalScalarOps() {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ // expression true, filtered
+ GenericRow genericRow = getRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null,
+ new FilterConfig("svInt = 123 AND svDouble <= 200"), null, null));
+ RecordTransformer transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+ // expression true, filtered
+ genericRow = getRecord();
+ tableConfig.setIngestionConfig(
+ new IngestionConfig(null, null,
+ new FilterConfig("svInt = 125 OR svLong <= 200"), null, null));
+ transformer = new FilterTransformer(tableConfig);
+ transformer.transform(genericRow);
+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+ }
+
@Test
public void testNullValueTransformer() {
RecordTransformer transformer = new NullValueTransformer(TABLE_CONFIG,
SCHEMA);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]