Thanks James. It worked.

Can you please provide me pointers to write UDAFs in phoenix like we
have GenericUDAFEvaluator for writing Hive UDAFs.
I am looking for a tutorial like
for phoenix.


On Sun, Oct 2, 2016 at 7:03 AM, James Taylor <> wrote:

> Hi Akhil,
> You want to create an Array, convert it to its byte[] representation, and
> set the ptr argument to point to it. Take a look at ArrayIT for examples of
> creating an Array:
>     // Create Array of FLOAT
>     Float[] floatArr =  new Float[2];
>     floatArr[0] = 64.87;
>     floatArr[1] = 89.96;
>     Array array = conn.createArrayOf("FLOAT", floatArr);
>     // Convert to byte[]
>     byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array);
>     // Set ptr to byte[]
>     ptr.set(arrayAsBytes);
> Thanks,
> James
> On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <>
> wrote:
>> I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose
>> datatype is 'VARBINARY'.
>> The data in these columns is compressed float[] in form of ByteBuffer
>> called DenseVector which is an ordered set of 16 bit IEEE floats of
>> cardinality no more than 3996.
>> I have loaded data into phoenix tables through spark-phoenix plugin. Just
>> to give an idea the mapreduce jobs write data in hive in parquet gzip
>> format. I read data into a dataframe using sqlContext.parquetFile() ,
>> register it as temp table and run a sqlContext.sql("select query ...")
>> query and finally calling"org.apache.phoenix.spark",
>> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
>> "localhost:2181"))
>> We have a hive/shark UDF(code pasted below) that can decode these
>> ByteBuffer columns and display them in readable float[]. So this UDF works
>> on spark-shell.
>> Now I just want to write a similar UDF in phoenix and run queries as "
>> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
>> siteflowtable" and further write UDAFs over it.
>> How do I make phoenix UDF return float[] ?? I have tried a lot many
>> things but none seem to work.
>> Below is the code for hive/shark UDF-
>> ------------------------------------------------------------
>> ------------------------------
>> package com.ABCD.densevectorudf;
>> import java.nio.ByteBuffer;
>> import java.util.Vector;
>> import org.apache.commons.logging.Log;
>> import org.apache.commons.logging.LogFactory;
>> import org.apache.hadoop.hive.ql.exec.Description;
>> import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
>> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
>> import org.apache.hadoop.hive.ql.metadata.HiveException;
>> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>> rFactory;
>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina
>> ryObjectInspector;
>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>> itiveObjectInspectorFactory;
>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri
>> ngObjectInspector;
>> import;
>> import com.ABCD.common.attval.IDenseVectorOperator;
>> import com.ABCD.common.attval.Utility;
>> import com.ABCD.common.attval.array.BufferOperations;
>> import com.ABCD.common.attval.array.FloatArrayFactory;
>> @Description(name = "DenseVectorUDF",
>> value = "Dense Vector UDF in Hive / Shark\n"
>> + "_FUNC_(binary|hex) - "
>> + "Returns the dense vector array<float> value\n",
>> extended = "Dense Vector UDAF in Hive / Shark")
>> public class DenseVectorUDF extends GenericUDF {
>> private static final Log LOG = LogFactory.getLog(DenseVectorU
>> DF.class.getName());
>> private ObjectInspector inputOI;
>> private ListObjectInspector outputOI;
>> @Override
>> public String getDisplayString(String[] children) {
>> StringBuilder sb = new StringBuilder();
>> sb.append("densevectorudf(");
>> for (int i = 0; i < children.length; i++) {
>> sb.append(children[i]);
>> if (i + 1 != children.length) {
>> sb.append(",");
>> }
>> }
>> sb.append(")");
>> return sb.toString();
>> }
>> @Override
>> public ObjectInspector initialize(ObjectInspector[] arguments) throws
>> UDFArgumentException {
>> if (arguments.length == 1) {
>> ObjectInspector first = arguments[0];
>> if (!(first instanceof StringObjectInspector) && !(first instanceof
>> BinaryObjectInspector)) {
>> LOG.error("first argument must be a either binary or hex buffer");
>> throw new UDFArgumentException("first argument must be a either binary or
>> hex buffer");
>> }
>> inputOI = first;
>> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit
>> iveObjectInspectorFactory.writableFloatObjectInspector);
>> } else {
>> throw new UDFArgumentLengthException("Wrong argument length is passed.
>> Arguments length is NOT supported.");
>> }
>> return outputOI;
>> }
>> @Override
>> public Object evaluate(DeferredObject[] arguments) throws HiveException {
>> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>> Object object = arguments[0].get();
>> Vector<Float> floatVector = null;
>> ByteBuffer buff = null;
>> if (inputOI instanceof StringObjectInspector) {
>> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec
>> t(object);
>> buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
>> } else if (inputOI instanceof BinaryObjectInspector) {
>> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec
>> t(object);
>> buff = ByteBuffer.wrap(bytes);
>> }
>> floatVector = idv.getElements(buff);
>> Object red [] = new Object[floatVector.size()];
>> for(int index = 0; index < red.length; index++){
>> red[index] = new FloatWritable(floatVector.get(index));
>> }
>>"Buffer header = " + BufferOperations.stringifyBuffer(buff));
>> return red;
>> }
>> }
>> ------------------------------------------------------------
>> ------------------------------
>> Following is the code I have written for Phoenix UDF-
>> ------------------------------------------------------------
>> ------------------------------
>> package org.apache.phoenix.expression.function;
>> import com.ABCD.common.attval.IDenseVectorOperator;
>> import com.ABCD.common.attval.array.BufferOperations;
>> import com.ABCD.common.attval.array.FloatArrayFactory;
>> import;
>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>> rFactory;
>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>> itiveObjectInspectorFactory;
>> import;
>> import org.apache.phoenix.expression.Expression;
>> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI
>> nspector;
>> import org.apache.phoenix.parse.FunctionParseNode.Argument;
>> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
>> import org.apache.phoenix.schema.SortOrder;
>> import org.apache.phoenix.schema.tuple.Tuple;
>> import org.apache.phoenix.schema.types.PDataType;
>> import org.apache.phoenix.schema.types.PFloatArray;
>> import org.apache.phoenix.schema.types.PVarbinary;
>> import java.nio.ByteBuffer;
>> import java.nio.FloatBuffer;
>> import java.sql.SQLException;
>> import java.util.List;
>> import java.util.Vector;
>> @BuiltInFunction(name = DenseVectorFunction.NAME, args = {
>>         @Argument(allowedTypes = {PVarbinary.class})})
>> public class DenseVectorFunction extends ScalarFunction {
>>     public static final String NAME = "DenseVectorFunction";
>>     private ListObjectInspector outputOI;
>>     public DenseVectorFunction() {
>>     }
>>     public DenseVectorFunction(List<Expression> children) throws
>> SQLException {
>>         super(children);
>>     }
>>     @Override
>>     public String getName() {
>>         return NAME;
>>     }
>>     public Expression getElementExpr() {
>>         return children.get(0);
>>     }
>>     @Override
>>     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
>>         if (!getElementExpr().evaluate(tuple, ptr)) {
>>             return false;
>>         }
>>         Object element = getElementExpr().getDataType().toObject(ptr,
>> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
>> getElementExpr().getScale());
>>         IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>>         PhoenixBinaryObjectInspector pboi = new
>> PhoenixBinaryObjectInspector();
>>         byte[] bytes = pboi.getPrimitiveJavaObject(element);
>>         Object object = ptr.get();
>>         Vector<Float> floatVector = null;
>>         ByteBuffer buff = null;
>>         buff = ByteBuffer.wrap(bytes);
>>         floatVector = idv.getElements(buff);
>>         Object[] red = new Object[floatVector.size()];
>>         for (int index = 0; index < red.length; index++) {
>>             red[index] = new FloatWritable(floatVector.get(index));
>>             System.out.println("" + floatVector.get(index));
>>         }
>>         System.out.println("Buffer header = " +
>> BufferOperations.stringifyBuffer(buff)); // This prints header info in
>> ByteBuffer which is correct
>>         ptr.set(??);
>>         return true;
>>     }
>>     @Override
>>     public SortOrder getSortOrder() {
>>         return children.get(0).getSortOrder();
>>     }
>>     @Override
>>     public PDataType getDataType() {
>>         return PFloatArray.INSTANCE;
>>     }
>> }
>> ------------------------------------------------------------
>> ------------------------------
>> Any help will be much appreciated.

Reply via email to