Thanks James. It worked. Can you please provide me pointers to write UDAFs in phoenix like we have GenericUDAFEvaluator for writing Hive UDAFs. I am looking for a tutorial like http://beekeeperdata.com/posts/hadoop/2015/08/17/hive-udaf-tutorial.html for phoenix.
Thanks, Akhil On Sun, Oct 2, 2016 at 7:03 AM, James Taylor <jamestay...@apache.org> wrote: > Hi Akhil, > You want to create an Array, convert it to its byte[] representation, and > set the ptr argument to point to it. Take a look at ArrayIT for examples of > creating an Array: > > // Create Array of FLOAT > Float[] floatArr = new Float[2]; > floatArr[0] = 64.87; > floatArr[1] = 89.96; > Array array = conn.createArrayOf("FLOAT", floatArr); > // Convert to byte[] > byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array); > // Set ptr to byte[] > ptr.set(arrayAsBytes); > > Thanks, > James > > > On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <akhilcancer...@gmail.com> > wrote: > >> I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose >> datatype is 'VARBINARY'. >> The data in these columns is compressed float[] in form of ByteBuffer >> called DenseVector which is an ordered set of 16 bit IEEE floats of >> cardinality no more than 3996. >> I have loaded data into phoenix tables through spark-phoenix plugin. Just >> to give an idea the mapreduce jobs write data in hive in parquet gzip >> format. I read data into a dataframe using sqlContext.parquetFile() , >> register it as temp table and run a sqlContext.sql("select query ...") >> query and finally calling res.save("org.apache.phoenix.spark", >> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" -> >> "localhost:2181")) >> We have a hive/shark UDF(code pasted below) that can decode these >> ByteBuffer columns and display them in readable float[]. So this UDF works >> on spark-shell. >> Now I just want to write a similar UDF in phoenix and run queries as " >> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from >> siteflowtable" and further write UDAFs over it. >> How do I make phoenix UDF return float[] ?? I have tried a lot many >> things but none seem to work. >> >> Below is the code for hive/shark UDF- >> ------------------------------------------------------------ >> ------------------------------ >> package com.ABCD.densevectorudf; >> >> import java.nio.ByteBuffer; >> import java.util.Vector; >> >> import org.apache.commons.logging.Log; >> import org.apache.commons.logging.LogFactory; >> import org.apache.hadoop.hive.ql.exec.Description; >> import org.apache.hadoop.hive.ql.exec.UDFArgumentException; >> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; >> import org.apache.hadoop.hive.ql.metadata.HiveException; >> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; >> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; >> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; >> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto >> rFactory; >> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina >> ryObjectInspector; >> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim >> itiveObjectInspectorFactory; >> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri >> ngObjectInspector; >> import org.apache.hadoop.io.FloatWritable; >> >> import com.ABCD.common.attval.IDenseVectorOperator; >> import com.ABCD.common.attval.Utility; >> import com.ABCD.common.attval.array.BufferOperations; >> import com.ABCD.common.attval.array.FloatArrayFactory; >> >> @Description(name = "DenseVectorUDF", >> value = "Dense Vector UDF in Hive / Shark\n" >> + "_FUNC_(binary|hex) - " >> + "Returns the dense vector array<float> value\n", >> extended = "Dense Vector UDAF in Hive / Shark") >> >> public class DenseVectorUDF extends GenericUDF { >> private static final Log LOG = LogFactory.getLog(DenseVectorU >> DF.class.getName()); >> private ObjectInspector inputOI; >> private ListObjectInspector outputOI; >> >> @Override >> public String getDisplayString(String[] children) { >> StringBuilder sb = new StringBuilder(); >> sb.append("densevectorudf("); >> for (int i = 0; i < children.length; i++) { >> sb.append(children[i]); >> if (i + 1 != children.length) { >> sb.append(","); >> } >> } >> sb.append(")"); >> return sb.toString(); >> } >> >> @Override >> public ObjectInspector initialize(ObjectInspector[] arguments) throws >> UDFArgumentException { >> if (arguments.length == 1) { >> ObjectInspector first = arguments[0]; >> if (!(first instanceof StringObjectInspector) && !(first instanceof >> BinaryObjectInspector)) { >> LOG.error("first argument must be a either binary or hex buffer"); >> throw new UDFArgumentException("first argument must be a either binary or >> hex buffer"); >> } >> inputOI = first; >> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit >> iveObjectInspectorFactory.writableFloatObjectInspector); >> } else { >> throw new UDFArgumentLengthException("Wrong argument length is passed. >> Arguments length is NOT supported."); >> } >> return outputOI; >> } >> >> @Override >> public Object evaluate(DeferredObject[] arguments) throws HiveException { >> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray(); >> Object object = arguments[0].get(); >> Vector<Float> floatVector = null; >> ByteBuffer buff = null; >> if (inputOI instanceof StringObjectInspector) { >> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec >> t(object); >> buff = ByteBuffer.wrap(Utility.hexToBytes(hex)); >> } else if (inputOI instanceof BinaryObjectInspector) { >> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec >> t(object); >> buff = ByteBuffer.wrap(bytes); >> } >> floatVector = idv.getElements(buff); >> Object red [] = new Object[floatVector.size()]; >> for(int index = 0; index < red.length; index++){ >> red[index] = new FloatWritable(floatVector.get(index)); >> } >> LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff)); >> return red; >> } >> } >> >> ------------------------------------------------------------ >> ------------------------------ >> >> >> Following is the code I have written for Phoenix UDF- >> ------------------------------------------------------------ >> ------------------------------ >> package org.apache.phoenix.expression.function; >> >> import com.ABCD.common.attval.IDenseVectorOperator; >> import com.ABCD.common.attval.array.BufferOperations; >> import com.ABCD.common.attval.array.FloatArrayFactory; >> import org.apache.hadoop.hbase.io.ImmutableBytesWritable; >> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; >> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto >> rFactory; >> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim >> itiveObjectInspectorFactory; >> import org.apache.hadoop.io.FloatWritable; >> import org.apache.phoenix.expression.Expression; >> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI >> nspector; >> import org.apache.phoenix.parse.FunctionParseNode.Argument; >> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; >> import org.apache.phoenix.schema.SortOrder; >> import org.apache.phoenix.schema.tuple.Tuple; >> import org.apache.phoenix.schema.types.PDataType; >> import org.apache.phoenix.schema.types.PFloatArray; >> import org.apache.phoenix.schema.types.PVarbinary; >> >> import java.nio.ByteBuffer; >> import java.nio.FloatBuffer; >> import java.sql.SQLException; >> import java.util.List; >> import java.util.Vector; >> >> @BuiltInFunction(name = DenseVectorFunction.NAME, args = { >> @Argument(allowedTypes = {PVarbinary.class})}) >> public class DenseVectorFunction extends ScalarFunction { >> public static final String NAME = "DenseVectorFunction"; >> private ListObjectInspector outputOI; >> >> public DenseVectorFunction() { >> } >> >> public DenseVectorFunction(List<Expression> children) throws >> SQLException { >> super(children); >> } >> >> @Override >> public String getName() { >> return NAME; >> } >> >> public Expression getElementExpr() { >> return children.get(0); >> } >> >> @Override >> public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { >> if (!getElementExpr().evaluate(tuple, ptr)) { >> return false; >> } >> Object element = getElementExpr().getDataType().toObject(ptr, >> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(), >> getElementExpr().getScale()); >> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray(); >> PhoenixBinaryObjectInspector pboi = new >> PhoenixBinaryObjectInspector(); >> byte[] bytes = pboi.getPrimitiveJavaObject(element); >> Object object = ptr.get(); >> Vector<Float> floatVector = null; >> ByteBuffer buff = null; >> buff = ByteBuffer.wrap(bytes); >> floatVector = idv.getElements(buff); >> >> Object[] red = new Object[floatVector.size()]; >> for (int index = 0; index < red.length; index++) { >> red[index] = new FloatWritable(floatVector.get(index)); >> System.out.println("" + floatVector.get(index)); >> } >> System.out.println("Buffer header = " + >> BufferOperations.stringifyBuffer(buff)); // This prints header info in >> ByteBuffer which is correct >> //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY >> ptr.set(??); >> return true; >> } >> >> @Override >> public SortOrder getSortOrder() { >> return children.get(0).getSortOrder(); >> } >> >> @Override >> public PDataType getDataType() { >> return PFloatArray.INSTANCE; >> } >> } >> ------------------------------------------------------------ >> ------------------------------ >> >> Any help will be much appreciated. >> > >