I am not sure I understand your code entirely, but I worked with UDAFs Friday and over the weekend ( https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a).
I think what is going on is that your "update" function is not defined correctly. Update should take a possibly initialized or in progress buffer and integrate new results in. Right now, you ignore the input row. What is probably occurring is that the initialization value "" is setting the buffer equal to the buffer itself which is "". Merge is responsible for taking two buffers and merging them together. In this case, the buffers are "" since initialize makes it "" and update keeps it "" so the result is just "". I am not sure it matters, but you probably also want to do buffer.getString(0). Pedro On Mon, Jul 11, 2016 at 3:04 AM, <luohui20...@sina.com> wrote: > hello guys: > I have a DF and a UDAF. this DF has 2 columns, lp_location_id , id, > both are of Int type. I want to group by id and aggregate all value of id > into 1 string. So I used a UDAF to do this transformation: multi Int values > to 1 String. However my UDAF returns empty values as the accessory attached. > Here is my code for my main class: > val hc = new org.apache.spark.sql.hive.HiveContext(sc) > val hiveTable = hc.sql("select lp_location_id,id from > house_id_pv_location_top50") > > val jsonArray = new JsonArray > val result = > hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id")).as("jsonArray")).collect.foreach(println) > > ------------------------------------------------------------------ > Here is my code of my UDAF: > > class JsonArray extends UserDefinedAggregateFunction { > def inputSchema: org.apache.spark.sql.types.StructType = > StructType(StructField("id", IntegerType) :: Nil) > > def bufferSchema: StructType = StructType( > StructField("id", StringType) :: Nil) > > def dataType: DataType = StringType > > def deterministic: Boolean = true > > def initialize(buffer: MutableAggregationBuffer): Unit = { > buffer(0) = "" > } > > def update(buffer: MutableAggregationBuffer, input: Row): Unit = { > buffer(0) = buffer.getAs[Int](0) > } > > def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { > val s1 = buffer1.getAs[Int](0).toString() > val s2 = buffer2.getAs[Int](0).toString() > buffer1(0) = s1.concat(s2) > } > > def evaluate(buffer: Row): Any = { > buffer(0) > } > } > > > I don't quit understand why I get empty result from my UDAF, I guess there > may be 2 reason: > 1. error initialization with "" in code of define initialize method > 2. the buffer didn't write successfully. > > can anyone share a idea about this. thank you. > > > > > -------------------------------- > > Thanks&Best regards! > San.Luo > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience