[ 
https://issues.apache.org/jira/browse/SPARK-3947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15013623#comment-15013623
 ] 

Herman van Hovell commented on SPARK-3947:
------------------------------------------

Hello Milad,

Could you be a bit more specific? What is the problem you are having? Is there 
a difference between local mode and cluster mode? What version of spark are you 
using?

I have adapted your code:
{noformat}
import java.math.BigDecimal

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, DataType, 
DoubleType, LongType}

class GeometricMean extends UserDefinedAggregateFunction {
  def inputSchema: StructType =
    StructType(StructField("value", DoubleType) :: Nil)

  def bufferSchema: StructType = StructType(
    StructField("count", LongType) ::
      StructField("product", DoubleType) :: Nil
  )

  def dataType: DataType = DoubleType

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 1.0
  }

  def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
    buffer(0) = buffer.getAs[Long](0) + 1
    buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
    buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
  }

  def evaluate(buffer: Row): Any = {
    math.pow(buffer.getDouble(1), 1.0d / buffer.getLong(0))
  }
}

sqlContext.udf.register("gm", new GeometricMean)

val df = Seq(
  (1, "italy", "emilia", 42, BigDecimal.valueOf(100, 0), "john"),
  (2, "italy", "toscana", 42, BigDecimal.valueOf(505, 1), "jim"),
  (3, "italy", "puglia", 42, BigDecimal.valueOf(70, 0), "jenn"),
  (4, "italy", "emilia", 42, BigDecimal.valueOf(75 ,0), "jack"),
  (5, "uk", "london", 42, BigDecimal.valueOf(200 ,0), "carl"),
  (6, "italy", "emilia", 42, BigDecimal.valueOf(42, 0), "john")).
  toDF("receipt_id", "store_country", "store_region", "store_id", "amount", 
"seller_name")
df.registerTempTable("receipts")
  
val q = sql("""
select   store_country,
         store_region,
         avg(amount),
         sum(amount),
         gm(amount)
from     receipts
where    amount > 50
         and store_country = 'italy'
group by store_country, store_region
""")

q.show

// Result (SPARK 1.5.2):
+-------------+------------+----+--------------------+-----------------+
|store_country|store_region| _c2|                 _c3|              _c4|
+-------------+------------+----+--------------------+-----------------+
|        italy|      emilia|87.5|175.0000000000000...|86.60254037844386|
|        italy|     toscana|50.5|50.50000000000000...|             50.5|
|        italy|      puglia|  70|70.00000000000000...|             70.0|
+-------------+------------+----+--------------------+-----------------+
{noformat}

And I really cannot find a problem.

> Support Scala/Java UDAF
> -----------------------
>
>                 Key: SPARK-3947
>                 URL: https://issues.apache.org/jira/browse/SPARK-3947
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Pei-Lun Lee
>            Assignee: Yin Huai
>             Fix For: 1.5.0
>
>         Attachments: spark-udaf.zip
>
>
> Right now only Hive UDAFs are supported. It would be nice to have UDAF 
> similar to UDF through SQLContext.registerFunction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to