[ 
https://issues.apache.org/jira/browse/PHOENIX-538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14512308#comment-14512308
 ] 

ASF GitHub Bot commented on PHOENIX-538:
----------------------------------------

Github user chrajeshbabu commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/77#discussion_r29098678
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/expression/function/UDFExpression.java
 ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.expression.function;
    +
    +import static org.apache.phoenix.query.QueryServices.DYNAMIC_JARS_DIR_KEY;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.List;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.locks.Lock;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.hadoop.hbase.util.DynamicClassLoader;
    +import org.apache.hadoop.hbase.util.KeyLocker;
    +import org.apache.hadoop.io.WritableUtils;
    +import org.apache.phoenix.compile.KeyPart;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.visitor.ExpressionVisitor;
    +import org.apache.phoenix.parse.PFunction;
    +import org.apache.phoenix.schema.PName;
    +import org.apache.phoenix.schema.PNameFactory;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.schema.types.PDataType;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.MapMaker;
    +
    +public class UDFExpression extends ScalarFunction {
    +    
    +    private static Configuration config = HBaseConfiguration.create();
    +
    +    private static final ConcurrentMap<PName, DynamicClassLoader> 
tenantIdSpecificCls =
    +            new MapMaker().concurrencyLevel(3).weakValues().makeMap();
    +
    +    private static final ConcurrentMap<String, DynamicClassLoader> 
pathSpecificCls =
    +            new MapMaker().concurrencyLevel(3).weakValues().makeMap();
    +
    +    public static final Log LOG = LogFactory.getLog(UDFExpression.class);
    +    
    +    /**
    +     * A locker used to synchronize class loader initialization per tenant 
id.
    +     */
    +    private static final KeyLocker<String> locker = new 
KeyLocker<String>();
    +
    +    /**
    +     * A locker used to synchronize class loader initialization per jar 
path.
    +     */
    +    private static final KeyLocker<String> pathLocker = new 
KeyLocker<String>();
    +
    +    private PName tenantId;
    +    private String functionClassName;
    +    private String jarPath;
    +    private ScalarFunction udfFunction;
    +    
    +    public UDFExpression() {
    +    }
    +
    +    public UDFExpression(List<Expression> children,PFunction functionInfo) 
{
    +        super(children);
    +        this.tenantId =
    +                functionInfo.getTenantId() == null ? PName.EMPTY_NAME : 
functionInfo.getTenantId();
    +        this.functionClassName = functionInfo.getClassName();
    +        this.jarPath = functionInfo.getJarPath();
    +        constructUDFFunction();
    +    }
    +
    +    @Override
    +    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
    +        return udfFunction.evaluate(tuple, ptr);
    +    }
    +
    +    @Override
    +    public <T> T accept(ExpressionVisitor<T> visitor) {
    +        return udfFunction.accept(visitor);
    +    }
    +
    +    @Override
    +    public PDataType getDataType() {
    +        return udfFunction.getDataType();
    +    }
    +
    +    @Override
    +    public String getName() {
    +        return udfFunction.getName();
    +    }
    +
    +    @Override
    +    public OrderPreserving preservesOrder() {
    +        return udfFunction.preservesOrder();
    +    }
    +
    +    @Override
    +    public KeyPart newKeyPart(KeyPart childPart) {
    +        return udfFunction.newKeyPart(childPart);
    +    }
    +
    +    @Override
    +    public int getKeyFormationTraversalIndex() {
    +        return udfFunction.getKeyFormationTraversalIndex();
    +    }
    +
    +    @Override
    +    public void write(DataOutput output) throws IOException {
    +        super.write(output);
    +        WritableUtils.writeString(output, tenantId.getString());
    +        WritableUtils.writeString(output, this.functionClassName);
    +        if(this.jarPath == null) {
    +            WritableUtils.writeString(output, "");
    +        } else {
    +            WritableUtils.writeString(output, this.jarPath);
    +        }
    +    }
    +    
    +    @Override
    +    public void readFields(DataInput input) throws IOException {
    +        super.readFields(input);
    +        this.tenantId = 
PNameFactory.newName(WritableUtils.readString(input));
    +        this.functionClassName = WritableUtils.readString(input);
    +        String str = WritableUtils.readString(input);
    +        this.jarPath = str.length() == 0 ? null: str;
    +        constructUDFFunction();
    +    }
    +
    +    private void constructUDFFunction() {
    +        try {
    +            DynamicClassLoader classLoader = getClassLoader(this.tenantId, 
this.jarPath);
    +            Class<?> clazz = classLoader.loadClass(this.functionClassName);
    +            Constructor<?> constructor = clazz.getConstructor(List.class);
    +            udfFunction = 
(ScalarFunction)constructor.newInstance(this.children);
    +        } catch (ClassNotFoundException | NoSuchMethodException | 
SecurityException
    +                | InstantiationException | IllegalAccessException | 
IllegalArgumentException
    +                | InvocationTargetException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    public static DynamicClassLoader getClassLoader(final PName tenantId, 
final String jarPath) {
    +        DynamicClassLoader cl = tenantIdSpecificCls.get(tenantId);
    +        String parent = null;
    +        if (cl != null) return cl;
    +        if(jarPath != null && !jarPath.isEmpty()) {
    +            cl = pathSpecificCls.get(jarPath);
    +            if (cl != null) return cl;
    +            Path path = new Path(jarPath);
    +            if(jarPath.endsWith(".jar")) {
    +                parent = path.getParent().toString();
    +            } else {
    +                parent = path.toString();
    +            }
    +        }
    +        if (jarPath == null || jarPath.isEmpty() || 
config.get(DYNAMIC_JARS_DIR_KEY) != null
    +                && (parent != null && 
parent.equals(config.get(DYNAMIC_JARS_DIR_KEY)))) {
    +            Lock lock = locker.acquireLock(tenantId.getString());
    --- End diff --
    
    Sure then will remove it.


> Support UDFs
> ------------
>
>                 Key: PHOENIX-538
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-538
>             Project: Phoenix
>          Issue Type: Task
>            Reporter: James Taylor
>            Assignee: Rajeshbabu Chintaguntla
>             Fix For: 5.0.0, 4.4.0
>
>         Attachments: PHOENIX-538-wip.patch, PHOENIX-538_v1.patch, 
> PHOENIX-538_v2.patch, PHOENIX-538_v3.patch, PHOENIX-538_v4.patch, 
> PHOENIX-538_v5.patch, PHOENIX-538_v6.patch, PHOENIX-538_v6.patch
>
>
> Phoenix allows built-in functions to be added (as described 
> [here](http://phoenix-hbase.blogspot.com/2013/04/how-to-add-your-own-built-in-function.html))
>  with the restriction that they must be in the phoenix jar. We should improve 
> on this and allow folks to declare new functions through a CREATE FUNCTION 
> command like this:
>       CREATE FUNCTION mdHash(anytype)
>       RETURNS binary(16)
>       LOCATION 'hdfs://path-to-my-jar' 'com.me.MDHashFunction'
> Since HBase supports loading jars dynamically, this would not be too 
> difficult. The function implementation class would be required to extend our 
> ScalarFunction base class. Here's how I could see it being implemented:
> * modify the phoenix grammar to support the new CREATE FUNCTION syntax
> * create a new UTFParseNode class to capture the parse state
> * add a new method to the MetaDataProtocol interface
> * add a new method in ConnectionQueryServices to invoke the MetaDataProtocol 
> method
> * add a new method in MetaDataClient to invoke the ConnectionQueryServices 
> method
> * persist functions in a new "SYSTEM.FUNCTION" table
> * add a new client-side representation to cache functions called PFunction
> * modify ColumnResolver to dynamically resolve a function in the same way we 
> dynamically resolve and load a table
> * create and register a new ExpressionType called UDFExpression
> * at parse time, check for the function name in the built in list first (as 
> is currently done), and if not found in the PFunction cache. If not found 
> there, then use the new UDFExpression as a placeholder and have the 
> ColumnResolver attempt to resolve it at compile time and throw an error if 
> unsuccessful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to