I am not sure I understand your code entirely, but I worked with UDAFs
Friday and over the weekend (
https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a).

I think what is going on is that your "update" function is not defined
correctly. Update should take a possibly initialized or in progress buffer
and integrate new results in. Right now, you ignore the input row. What is
probably occurring is that the initialization value "" is setting the
buffer equal to the buffer itself which is "".

Merge is responsible for taking two buffers and merging them together. In
this case, the buffers are "" since initialize makes it "" and update keeps
it "" so the result is just "". I am not sure it matters, but you probably
also want to do buffer.getString(0).

Pedro

On Mon, Jul 11, 2016 at 3:04 AM, <luohui20...@sina.com> wrote:

> hello guys:
>      I have a DF and a UDAF. this DF has 2 columns, lp_location_id , id,
> both are of Int type. I want to group by id and aggregate all value of id
> into 1 string. So I used a UDAF to do this transformation: multi Int values
> to 1 String. However my UDAF returns empty values as the accessory attached.
>      Here is my code for my main class:
>     val hc = new org.apache.spark.sql.hive.HiveContext(sc)
>     val hiveTable = hc.sql("select lp_location_id,id from
> house_id_pv_location_top50")
>
>     val jsonArray = new JsonArray
>     val result =
> hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id")).as("jsonArray")).collect.foreach(println)
>
> ------------------------------------------------------------------
>      Here is my code of my UDAF:
>
> class JsonArray extends UserDefinedAggregateFunction {
>   def inputSchema: org.apache.spark.sql.types.StructType =
>     StructType(StructField("id", IntegerType) :: Nil)
>
>   def bufferSchema: StructType = StructType(
>     StructField("id", StringType) :: Nil)
>
>   def dataType: DataType = StringType
>
>   def deterministic: Boolean = true
>
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
>     buffer(0) = ""
>   }
>
>   def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
>     buffer(0) = buffer.getAs[Int](0)
>   }
>
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>     val s1 = buffer1.getAs[Int](0).toString()
>     val s2 = buffer2.getAs[Int](0).toString()
>     buffer1(0) = s1.concat(s2)
>   }
>
>   def evaluate(buffer: Row): Any = {
>     buffer(0)
>   }
> }
>
>
> I don't quit understand why I get empty result from my UDAF, I guess there
> may be 2 reason:
> 1. error initialization with "" in code of define initialize method
> 2. the buffer didn't write successfully.
>
> can anyone share a idea about this. thank you.
>
>
>
>
> --------------------------------
>
> Thanks&amp;Best regards!
> San.Luo
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience

Reply via email to