[ https://issues.apache.org/jira/browse/SPARK-3947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15013623#comment-15013623 ]
Herman van Hovell commented on SPARK-3947: ------------------------------------------ Hello Milad, Could you be a bit more specific? What is the problem you are having? Is there a difference between local mode and cluster mode? What version of spark are you using? I have adapted your code: {noformat} import java.math.BigDecimal import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, DataType, DoubleType, LongType} class GeometricMean extends UserDefinedAggregateFunction { def inputSchema: StructType = StructType(StructField("value", DoubleType) :: Nil) def bufferSchema: StructType = StructType( StructField("count", LongType) :: StructField("product", DoubleType) :: Nil ) def dataType: DataType = DoubleType def deterministic: Boolean = true def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 1.0 } def update(buffer: MutableAggregationBuffer,input: Row): Unit = { buffer(0) = buffer.getAs[Long](0) + 1 buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0) } def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0) buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1) } def evaluate(buffer: Row): Any = { math.pow(buffer.getDouble(1), 1.0d / buffer.getLong(0)) } } sqlContext.udf.register("gm", new GeometricMean) val df = Seq( (1, "italy", "emilia", 42, BigDecimal.valueOf(100, 0), "john"), (2, "italy", "toscana", 42, BigDecimal.valueOf(505, 1), "jim"), (3, "italy", "puglia", 42, BigDecimal.valueOf(70, 0), "jenn"), (4, "italy", "emilia", 42, BigDecimal.valueOf(75 ,0), "jack"), (5, "uk", "london", 42, BigDecimal.valueOf(200 ,0), "carl"), (6, "italy", "emilia", 42, BigDecimal.valueOf(42, 0), "john")). toDF("receipt_id", "store_country", "store_region", "store_id", "amount", "seller_name") df.registerTempTable("receipts") val q = sql(""" select store_country, store_region, avg(amount), sum(amount), gm(amount) from receipts where amount > 50 and store_country = 'italy' group by store_country, store_region """) q.show // Result (SPARK 1.5.2): +-------------+------------+----+--------------------+-----------------+ |store_country|store_region| _c2| _c3| _c4| +-------------+------------+----+--------------------+-----------------+ | italy| emilia|87.5|175.0000000000000...|86.60254037844386| | italy| toscana|50.5|50.50000000000000...| 50.5| | italy| puglia| 70|70.00000000000000...| 70.0| +-------------+------------+----+--------------------+-----------------+ {noformat} And I really cannot find a problem. > Support Scala/Java UDAF > ----------------------- > > Key: SPARK-3947 > URL: https://issues.apache.org/jira/browse/SPARK-3947 > Project: Spark > Issue Type: Sub-task > Components: SQL > Reporter: Pei-Lun Lee > Assignee: Yin Huai > Fix For: 1.5.0 > > Attachments: spark-udaf.zip > > > Right now only Hive UDAFs are supported. It would be nice to have UDAF > similar to UDF through SQLContext.registerFunction. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org