This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch generic-transform-function in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 9d1660092484d3e0d86dc21478fd3720877e35eb Author: kishoreg <g.kish...@gmail.com> AuthorDate: Sat May 23 10:49:34 2020 -0700 Adding support to invoke any scalar function via GenericTransformFunction --- .../pinot/common/function/FunctionRegistry.java | 1 + .../pinot/common/function/StringFunctions.java | 33 +++++ .../function/GenericTransformFunction.java | 137 +++++++++++++++++++++ .../function/TransformFunctionFactory.java | 20 ++- 4 files changed, 188 insertions(+), 3 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java index a502bf2..87b79e2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java @@ -63,6 +63,7 @@ public class FunctionRegistry { static { try { + FunctionRegistry.registerFunction(String.class.getDeclaredMethod("reverse", String.class)); FunctionRegistry.registerFunction(DateTimeFunctions.class.getDeclaredMethod("toEpochSeconds", Long.class)); FunctionRegistry.registerFunction(DateTimeFunctions.class.getDeclaredMethod("toEpochMinutes", Long.class)); FunctionRegistry.registerFunction(DateTimeFunctions.class.getDeclaredMethod("toEpochHours", Long.class)); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/StringFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/StringFunctions.java new file mode 100644 index 0000000..b7e92d1 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/StringFunctions.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function; + +import java.util.concurrent.TimeUnit; + + +/** + * + */ +public class StringFunctions { + + static String reverse(String input) { + return new StringBuilder(input).reverse().toString(); + } + +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GenericTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GenericTransformFunction.java new file mode 100644 index 0000000..8fe5987 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GenericTransformFunction.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.function.FunctionInfo; +import org.apache.pinot.common.function.FunctionInvoker; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.spi.data.FieldSpec; + + +public class GenericTransformFunction extends BaseTransformFunction { + + private FunctionInfo _info; + FunctionInvoker _functionInvoker; + String _name; + Object[] _args; + List<Integer> _nonLiteralArgIndices; + List<FieldSpec.DataType> _nonLiteralArgType; + List<TransformFunction> _nonLiteralTransformFunction; + String[] _stringResult; + + public GenericTransformFunction() { + _nonLiteralArgIndices = new ArrayList(); + _nonLiteralArgType = new ArrayList(); + } + + @Override + public String getName() { + return _name; + } + + public void setFunction(String functionName, FunctionInfo info) + throws Exception { + _name = functionName; + _info = info; + _functionInvoker = new FunctionInvoker(info); + } + + @Override + public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) { + //assert method.args.length == arguments.size + _args = new Object[arguments.size()]; + + for (int i = 0; i < arguments.size(); i++) { + TransformFunction function = arguments.get(i); + if (function instanceof LiteralTransformFunction) { + String literal = ((LiteralTransformFunction) function).getLiteral(); + //convert String to the right dataType based on method param + + Class paramType = _functionInvoker.getParameterTypes()[i]; + switch (paramType.getTypeName()) { + case "Integer": + _args[i] = Integer.parseInt(literal); + break; + case "String": + _args[i] = literal; + break; + //add other types and throw exception for non primitive/string classes + } + } else { + _nonLiteralArgIndices.add(i); + Class paramType = _functionInvoker.getParameterTypes()[i]; + //find the right pinot data Type + switch (paramType.getTypeName()) { + case "Integer": + _nonLiteralArgType.add(FieldSpec.DataType.INT); + break; + case "String": + _nonLiteralArgType.add(FieldSpec.DataType.STRING); + break; + //todo add other types + } + } + } + } + + @Override + public TransformResultMetadata getResultMetadata() { + return STRING_SV_NO_DICTIONARY_METADATA; + } + + @SuppressWarnings("Duplicates") + @Override + public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) { + if (_stringResult == null) { + _stringResult = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL]; + } + + int length = projectionBlock.getNumDocs(); + int numNonLiteralArgs = _nonLiteralArgIndices.size(); + Object[][] nonLiteralBlockValues = new Object[numNonLiteralArgs][]; + + for (int i = 0; i < numNonLiteralArgs; i++) { + TransformFunction transformFunc = _nonLiteralTransformFunction.get(i); + FieldSpec.DataType returnType = _nonLiteralArgType.get(i); + switch (returnType) { + case STRING: + nonLiteralBlockValues[i] = transformFunc.transformToStringValuesSV(projectionBlock); + //todo handle other types + } + } + + //now invoke the actual function + for (int i = 0; i < length; i++) { + for (int k = 0; k < numNonLiteralArgs; k++) { + _args[_nonLiteralArgIndices.get(k)] = nonLiteralBlockValues[k][i]; + } + _stringResult[i] = (String) _functionInvoker.process(_args); + } + return _stringResult; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java index e9bd6bc..aac1938 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.pinot.common.function.FunctionInfo; +import org.apache.pinot.common.function.FunctionRegistry; import org.apache.pinot.common.function.TransformFunctionType; import org.apache.pinot.common.request.transform.TransformExpressionTree; import org.apache.pinot.core.common.DataSource; @@ -65,7 +67,8 @@ public class TransformFunctionFactory { put(TransformFunctionType.SQRT.getName().toLowerCase(), SqrtTransformFunction.class); put(TransformFunctionType.CAST.getName().toLowerCase(), CastTransformFunction.class); - put(TransformFunctionType.JSONEXTRACTSCALAR.getName().toLowerCase(), JsonExtractScalarTransformFunction.class); + put(TransformFunctionType.JSONEXTRACTSCALAR.getName().toLowerCase(), + JsonExtractScalarTransformFunction.class); put(TransformFunctionType.JSONEXTRACTKEY.getName().toLowerCase(), JsonExtractKeyTransformFunction.class); put(TransformFunctionType.TIMECONVERT.getName().toLowerCase(), TimeConversionTransformFunction.class); put(TransformFunctionType.DATETIMECONVERT.getName().toLowerCase(), DateTimeConversionTransformFunction.class); @@ -112,13 +115,24 @@ public class TransformFunctionFactory { switch (expression.getExpressionType()) { case FUNCTION: String functionName = expression.getValue(); - Class<? extends TransformFunction> transformFunctionClass = TRANSFORM_FUNCTION_MAP.get(functionName); + Class<? extends TransformFunction> transformFunctionClass; + FunctionInfo functionInfo = null; + if (FunctionRegistry.containsFunctionByName(functionName)) { + transformFunctionClass = GenericTransformFunction.class; + functionInfo = FunctionRegistry.getFunctionByName(functionName); + } else { + transformFunctionClass = TRANSFORM_FUNCTION_MAP.get(functionName); + } + if (transformFunctionClass == null) { throw new BadQueryRequestException("Unsupported transform function: " + functionName); } try { transformFunction = transformFunctionClass.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { + if (functionInfo != null) { + ((GenericTransformFunction) transformFunction).setFunction(functionName, functionInfo); + } + } catch (Exception e) { throw new RuntimeException("Caught exception while instantiating transform function: " + functionName, e); } List<TransformExpressionTree> children = expression.getChildren(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org