hi pedro     thanks for your advices. I got my code working as below:code in 
main:    val hc = new org.apache.spark.sql.hive.HiveContext(sc)
    val hiveTable = hc.sql("select lp_location_id,id,pv from 
house_id_pv_location_top50")    val jsonArray = new JsonArray
    val middleResult = 
hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id"), 
col("pv")).substr(2, 2048).as("id_pv"))
    middleResult.collect.foreach(println)
    middleResult.write.saveAsTable("house_id_pv_top50_json")
code in my UDAF:class JsonArray extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("id", IntegerType) :: StructField("pv", IntegerType) 
:: Nil)
    
  def bufferSchema: StructType = StructType(
    StructField("id", StringType) :: StructField("pv", StringType) :: Nil)
  def dataType: DataType = StringType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = ""
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = 
buffer.getAs[String](0).concat(",{\"id\":\""+input.getInt(0).toString()+"\",\"pv\":\""+input.getInt(1).toString()+"\"}")
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val s1 = buffer1.getAs[Int](0).toString()
    val s2 = buffer2.getAs[Int](0).toString()
    buffer1(0) = s1.concat(s2)
  }
And the result is what I am expecting as attached file.

--------------------------------

 

Thanks&Best regards!
San.Luo

----- 原始邮件 -----
发件人:Pedro Rodriguez <ski.rodrig...@gmail.com>
收件人:luohui20...@sina.com
抄送人:user <user@spark.apache.org>
主题:Re: question about UDAF
日期:2016年07月12日 04点17分

I am not sure I understand your code entirely, but I worked with UDAFs Friday 
and over the weekend 
(https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a).
I think what is going on is that your "update" function is not defined 
correctly. Update should take a possibly initialized or in progress buffer and 
integrate new results in. Right now, you ignore the input row. What is probably 
occurring is that the initialization value "" is setting the buffer equal to 
the buffer itself which is "".
Merge is responsible for taking two buffers and merging them together. In this 
case, the buffers are "" since initialize makes it "" and update keeps it "" so 
the result is just "". I am not sure it matters, but you probably also want to 
do buffer.getString(0).
Pedro
On Mon, Jul 11, 2016 at 3:04 AM,  <luohui20...@sina.com> wrote:
hello guys:     I have a DF and a UDAF. this DF has 2 columns, lp_location_id , 
id, both are of Int type. I want to group by id and aggregate all value of id 
into 1 string. So I used a UDAF to do this transformation: multi Int values to 
1 String. However my UDAF returns empty values as the accessory attached.     
Here is my code for my main class:    val hc = new 
org.apache.spark.sql.hive.HiveContext(sc)
    val hiveTable = hc.sql("select lp_location_id,id from 
house_id_pv_location_top50")
    
    val jsonArray = new JsonArray
    val result = 
hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id")).as("jsonArray")).collect.foreach(println)
------------------------------------------------------------------     Here is 
my code of my UDAF:
class JsonArray extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("id", IntegerType) :: Nil)
    
  def bufferSchema: StructType = StructType(
    StructField("id", StringType) :: Nil)
  def dataType: DataType = StringType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = ""
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0)
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val s1 = buffer1.getAs[Int](0).toString()
    val s2 = buffer2.getAs[Int](0).toString()
    buffer1(0) = s1.concat(s2)
  }
  def evaluate(buffer: Row): Any = {
    buffer(0)
  }
}

I don't quit understand why I get empty result from my UDAF, I guess there may 
be 2 reason:1. error initialization with "" in code of define initialize 
method2. the buffer didn't write successfully.
can anyone share a idea about this. thank you.



--------------------------------

 

Thanks&amp;Best regards!
San.Luo



---------------------------------------------------------------------

To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU BoulderUC Berkeley AMPLab 
Alumni
ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423Github: 
github.com/EntilZha | LinkedIn: 
https://www.linkedin.com/in/pedrorodriguezscience



---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to