Hi Akhil,
You want to create an Array, convert it to its byte[] representation, and
set the ptr argument to point to it. Take a look at ArrayIT for examples of
creating an Array:
// Create Array of FLOAT
Float[] floatArr = new Float[2];
floatArr[0] = 64.87;
floatArr[1] = 89.96;
Array array = conn.createArrayOf("FLOAT", floatArr);
// Convert to byte[]
byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array);
// Set ptr to byte[]
ptr.set(arrayAsBytes);
Thanks,
James
On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <[email protected]> wrote:
> I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose
> datatype is 'VARBINARY'.
> The data in these columns is compressed float[] in form of ByteBuffer
> called DenseVector which is an ordered set of 16 bit IEEE floats of
> cardinality no more than 3996.
> I have loaded data into phoenix tables through spark-phoenix plugin. Just
> to give an idea the mapreduce jobs write data in hive in parquet gzip
> format. I read data into a dataframe using sqlContext.parquetFile() ,
> register it as temp table and run a sqlContext.sql("select query ...")
> query and finally calling res.save("org.apache.phoenix.spark",
> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
> "localhost:2181"))
> We have a hive/shark UDF(code pasted below) that can decode these
> ByteBuffer columns and display them in readable float[]. So this UDF works
> on spark-shell.
> Now I just want to write a similar UDF in phoenix and run queries as "
> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
> siteflowtable" and further write UDAFs over it.
> How do I make phoenix UDF return float[] ?? I have tried a lot many things
> but none seem to work.
>
> Below is the code for hive/shark UDF-
> ------------------------------------------------------------
> ------------------------------
> package com.ABCD.densevectorudf;
>
> import java.nio.ByteBuffer;
> import java.util.Vector;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.hive.ql.exec.Description;
> import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
> import org.apache.hadoop.hive.ql.metadata.HiveException;
> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
> rFactory;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina
> ryObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
> itiveObjectInspectorFactory;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri
> ngObjectInspector;
> import org.apache.hadoop.io.FloatWritable;
>
> import com.ABCD.common.attval.IDenseVectorOperator;
> import com.ABCD.common.attval.Utility;
> import com.ABCD.common.attval.array.BufferOperations;
> import com.ABCD.common.attval.array.FloatArrayFactory;
>
> @Description(name = "DenseVectorUDF",
> value = "Dense Vector UDF in Hive / Shark\n"
> + "_FUNC_(binary|hex) - "
> + "Returns the dense vector array<float> value\n",
> extended = "Dense Vector UDAF in Hive / Shark")
>
> public class DenseVectorUDF extends GenericUDF {
> private static final Log LOG = LogFactory.getLog(DenseVectorU
> DF.class.getName());
> private ObjectInspector inputOI;
> private ListObjectInspector outputOI;
>
> @Override
> public String getDisplayString(String[] children) {
> StringBuilder sb = new StringBuilder();
> sb.append("densevectorudf(");
> for (int i = 0; i < children.length; i++) {
> sb.append(children[i]);
> if (i + 1 != children.length) {
> sb.append(",");
> }
> }
> sb.append(")");
> return sb.toString();
> }
>
> @Override
> public ObjectInspector initialize(ObjectInspector[] arguments) throws
> UDFArgumentException {
> if (arguments.length == 1) {
> ObjectInspector first = arguments[0];
> if (!(first instanceof StringObjectInspector) && !(first instanceof
> BinaryObjectInspector)) {
> LOG.error("first argument must be a either binary or hex buffer");
> throw new UDFArgumentException("first argument must be a either binary or
> hex buffer");
> }
> inputOI = first;
> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit
> iveObjectInspectorFactory.writableFloatObjectInspector);
> } else {
> throw new UDFArgumentLengthException("Wrong argument length is passed.
> Arguments length is NOT supported.");
> }
> return outputOI;
> }
>
> @Override
> public Object evaluate(DeferredObject[] arguments) throws HiveException {
> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
> Object object = arguments[0].get();
> Vector<Float> floatVector = null;
> ByteBuffer buff = null;
> if (inputOI instanceof StringObjectInspector) {
> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec
> t(object);
> buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
> } else if (inputOI instanceof BinaryObjectInspector) {
> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec
> t(object);
> buff = ByteBuffer.wrap(bytes);
> }
> floatVector = idv.getElements(buff);
> Object red [] = new Object[floatVector.size()];
> for(int index = 0; index < red.length; index++){
> red[index] = new FloatWritable(floatVector.get(index));
> }
> LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff));
> return red;
> }
> }
>
> ------------------------------------------------------------
> ------------------------------
>
>
> Following is the code I have written for Phoenix UDF-
> ------------------------------------------------------------
> ------------------------------
> package org.apache.phoenix.expression.function;
>
> import com.ABCD.common.attval.IDenseVectorOperator;
> import com.ABCD.common.attval.array.BufferOperations;
> import com.ABCD.common.attval.array.FloatArrayFactory;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
> rFactory;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
> itiveObjectInspectorFactory;
> import org.apache.hadoop.io.FloatWritable;
> import org.apache.phoenix.expression.Expression;
> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI
> nspector;
> import org.apache.phoenix.parse.FunctionParseNode.Argument;
> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
> import org.apache.phoenix.schema.SortOrder;
> import org.apache.phoenix.schema.tuple.Tuple;
> import org.apache.phoenix.schema.types.PDataType;
> import org.apache.phoenix.schema.types.PFloatArray;
> import org.apache.phoenix.schema.types.PVarbinary;
>
> import java.nio.ByteBuffer;
> import java.nio.FloatBuffer;
> import java.sql.SQLException;
> import java.util.List;
> import java.util.Vector;
>
> @BuiltInFunction(name = DenseVectorFunction.NAME, args = {
> @Argument(allowedTypes = {PVarbinary.class})})
> public class DenseVectorFunction extends ScalarFunction {
> public static final String NAME = "DenseVectorFunction";
> private ListObjectInspector outputOI;
>
> public DenseVectorFunction() {
> }
>
> public DenseVectorFunction(List<Expression> children) throws
> SQLException {
> super(children);
> }
>
> @Override
> public String getName() {
> return NAME;
> }
>
> public Expression getElementExpr() {
> return children.get(0);
> }
>
> @Override
> public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
> if (!getElementExpr().evaluate(tuple, ptr)) {
> return false;
> }
> Object element = getElementExpr().getDataType().toObject(ptr,
> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
> getElementExpr().getScale());
> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
> PhoenixBinaryObjectInspector pboi = new
> PhoenixBinaryObjectInspector();
> byte[] bytes = pboi.getPrimitiveJavaObject(element);
> Object object = ptr.get();
> Vector<Float> floatVector = null;
> ByteBuffer buff = null;
> buff = ByteBuffer.wrap(bytes);
> floatVector = idv.getElements(buff);
>
> Object[] red = new Object[floatVector.size()];
> for (int index = 0; index < red.length; index++) {
> red[index] = new FloatWritable(floatVector.get(index));
> System.out.println("" + floatVector.get(index));
> }
> System.out.println("Buffer header = " +
> BufferOperations.stringifyBuffer(buff)); // This prints header info in
> ByteBuffer which is correct
> //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY
> ptr.set(??);
> return true;
> }
>
> @Override
> public SortOrder getSortOrder() {
> return children.get(0).getSortOrder();
> }
>
> @Override
> public PDataType getDataType() {
> return PFloatArray.INSTANCE;
> }
> }
> ------------------------------------------------------------
> ------------------------------
>
> Any help will be much appreciated.
>