I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose
datatype is 'VARBINARY'.
The data in these columns is compressed float[] in form of ByteBuffer
called DenseVector which is an ordered set of 16 bit IEEE floats of
cardinality no more than 3996.
I have loaded data into phoenix tables through spark-phoenix plugin. Just
to give an idea the mapreduce jobs write data in hive in parquet gzip
format. I read data into a dataframe using sqlContext.parquetFile() ,
register it as temp table and run a sqlContext.sql("select query ...")
query and finally calling res.save("org.apache.phoenix.spark",
SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
"localhost:2181"))
We have a hive/shark UDF(code pasted below) that can decode these
ByteBuffer columns and display them in readable float[]. So this UDF works
on spark-shell.
Now I just want to write a similar UDF in phoenix and run queries as "
select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
siteflowtable" and further write UDAFs over it.
How do I make phoenix UDF return float[] ?? I have tried a lot many things
but none seem to work.
Below is the code for hive/shark UDF-
------------------------------------------------------------
------------------------------
package com.ABCD.densevectorudf;
import java.nio.ByteBuffer;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
BinaryObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
StringObjectInspector;
import org.apache.hadoop.io.FloatWritable;
import com.ABCD.common.attval.IDenseVectorOperator;
import com.ABCD.common.attval.Utility;
import com.ABCD.common.attval.array.BufferOperations;
import com.ABCD.common.attval.array.FloatArrayFactory;
@Description(name = "DenseVectorUDF",
value = "Dense Vector UDF in Hive / Shark\n"
+ "_FUNC_(binary|hex) - "
+ "Returns the dense vector array<float> value\n",
extended = "Dense Vector UDAF in Hive / Shark")
public class DenseVectorUDF extends GenericUDF {
private static final Log LOG = LogFactory.getLog(
DenseVectorUDF.class.getName());
private ObjectInspector inputOI;
private ListObjectInspector outputOI;
@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("densevectorudf(");
for (int i = 0; i < children.length; i++) {
sb.append(children[i]);
if (i + 1 != children.length) {
sb.append(",");
}
}
sb.append(")");
return sb.toString();
}
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws
UDFArgumentException {
if (arguments.length == 1) {
ObjectInspector first = arguments[0];
if (!(first instanceof StringObjectInspector) && !(first instanceof
BinaryObjectInspector)) {
LOG.error("first argument must be a either binary or hex buffer");
throw new UDFArgumentException("first argument must be a either binary or
hex buffer");
}
inputOI = first;
outputOI = ObjectInspectorFactory.getStandardListObjectInspector(
PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
} else {
throw new UDFArgumentLengthException("Wrong argument length is passed.
Arguments length is NOT supported.");
}
return outputOI;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
Object object = arguments[0].get();
Vector<Float> floatVector = null;
ByteBuffer buff = null;
if (inputOI instanceof StringObjectInspector) {
String hex = ((StringObjectInspector) inputOI).
getPrimitiveJavaObject(object);
buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
} else if (inputOI instanceof BinaryObjectInspector) {
byte[] bytes = ((BinaryObjectInspector) inputOI).
getPrimitiveJavaObject(object);
buff = ByteBuffer.wrap(bytes);
}
floatVector = idv.getElements(buff);
Object red [] = new Object[floatVector.size()];
for(int index = 0; index < red.length; index++){
red[index] = new FloatWritable(floatVector.get(index));
}
LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff));
return red;
}
}
------------------------------------------------------------
------------------------------
Following is the code I have written for Phoenix UDF-
------------------------------------------------------------
------------------------------
package org.apache.phoenix.expression.function;
import com.ABCD.common.attval.IDenseVectorOperator;
import com.ABCD.common.attval.array.BufferOperations;
import com.ABCD.common.attval.array.FloatArrayFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.FloatWritable;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectInspector;
import org.apache.phoenix.parse.FunctionParseNode.Argument;
import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PFloatArray;
import org.apache.phoenix.schema.types.PVarbinary;
import java.nio.ByteBuffer;
import java.nio.FloatBuffer;
import java.sql.SQLException;
import java.util.List;
import java.util.Vector;
@BuiltInFunction(name = DenseVectorFunction.NAME, args = {
@Argument(allowedTypes = {PVarbinary.class})})
public class DenseVectorFunction extends ScalarFunction {
public static final String NAME = "DenseVectorFunction";
private ListObjectInspector outputOI;
public DenseVectorFunction() {
}
public DenseVectorFunction(List<Expression> children) throws
SQLException {
super(children);
}
@Override
public String getName() {
return NAME;
}
public Expression getElementExpr() {
return children.get(0);
}
@Override
public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
if (!getElementExpr().evaluate(tuple, ptr)) {
return false;
}
Object element = getElementExpr().getDataType().toObject(ptr,
getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
getElementExpr().getScale());
IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
PhoenixBinaryObjectInspector pboi = new
PhoenixBinaryObjectInspector();
byte[] bytes = pboi.getPrimitiveJavaObject(element);
Object object = ptr.get();
Vector<Float> floatVector = null;
ByteBuffer buff = null;
buff = ByteBuffer.wrap(bytes);
floatVector = idv.getElements(buff);
Object[] red = new Object[floatVector.size()];
for (int index = 0; index < red.length; index++) {
red[index] = new FloatWritable(floatVector.get(index));
System.out.println("" + floatVector.get(index));
}
System.out.println("Buffer header = " +
BufferOperations.stringifyBuffer(buff));
// This prints header info in ByteBuffer which is correct
//HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY
ptr.set(??);
return true;
}
@Override
public SortOrder getSortOrder() {
return children.get(0).getSortOrder();
}
@Override
public PDataType getDataType() {
return PFloatArray.INSTANCE;
}
}
------------------------------------------------------------
------------------------------
Any help will be much appreciated.