[ https://issues.apache.org/jira/browse/SPARK-12491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15072902#comment-15072902 ]
Tristan commented on SPARK-12491: --------------------------------- REPL with logical plans: {code:None|borderStyle=solid} scala> import com.pipeline.spark._ import com.pipeline.spark._ scala> sqlContext.udf.register("gm", new GeometricMean) res0: org.apache.spark.sql.expressions.UserDefinedAggregateFunction = com.pipeline.spark.GeometricMean@497031ea scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> val ids = sqlContext.range(1, 20) ids: org.apache.spark.sql.DataFrame = [id: bigint] scala> ids.registerTempTable("ids") scala> val df = sqlContext.sql("select id, id % 3 as group_id from ids") df: org.apache.spark.sql.DataFrame = [id: bigint, group_id: bigint] scala> df.registerTempTable("simple") scala> val q = sqlContext.sql("select group_id, gm(id) from simple group by group_id") q: org.apache.spark.sql.DataFrame = [group_id: bigint, _c1: double] scala> q.explain(true) == Parsed Logical Plan == 'Aggregate ['group_id], [unresolvedalias('group_id),unresolvedalias('gm('id))] 'UnresolvedRelation [simple], None == Analyzed Logical Plan == group_id: bigint, _c1: double Aggregate [group_id#1L], [group_id#1L,(GeometricMean(cast(id#0L as double)),mode=Complete,isDistinct=false) AS _c1#12] Subquery simple Project [id#0L,(id#0L % cast(3 as bigint)) AS group_id#1L] Subquery ids LogicalRDD [id#0L], MapPartitionsRDD[3] at range at <console>:25 == Optimized Logical Plan == Aggregate [group_id#1L], [group_id#1L,(GeometricMean(cast(id#0L as double)),mode=Complete,isDistinct=false) AS _c1#12] Project [id#0L,(id#0L % 3) AS group_id#1L] LogicalRDD [id#0L], MapPartitionsRDD[3] at range at <console>:25 == Physical Plan == SortBasedAggregate(key=[group_id#1L], functions=[(GeometricMean(cast(id#0L as double)),mode=Final,isDistinct=false)], output=[group_id#1L,_c1#12]) ConvertToSafe TungstenSort [group_id#1L ASC], false, 0 TungstenExchange hashpartitioning(group_id#1L) ConvertToUnsafe SortBasedAggregate(key=[group_id#1L], functions=[(GeometricMean(cast(id#0L as double)),mode=Partial,isDistinct=false)], output=[group_id#1L,count#14L,product#15]) ConvertToSafe TungstenSort [group_id#1L ASC], false, 0 TungstenProject [id#0L,(id#0L % 3) AS group_id#1L] Scan PhysicalRDD[id#0L] Code Generation: true scala> q.show() +--------+---+ |group_id|_c1| +--------+---+ | 0|0.0| | 1|0.0| | 2|0.0| +--------+---+ scala> val q2 = sqlContext.sql("select group_id, gm(id) as geomean from simple group by group_id") q2: org.apache.spark.sql.DataFrame = [group_id: bigint, geomean: double] scala> q2.explain(true) == Parsed Logical Plan == 'Aggregate ['group_id], [unresolvedalias('group_id),unresolvedalias('gm('id) AS geomean#19)] 'UnresolvedRelation [simple], None == Analyzed Logical Plan == group_id: bigint, geomean: double Aggregate [group_id#1L], [group_id#1L,(GeometricMean(cast(id#0L as double)),mode=Complete,isDistinct=false) AS geomean#19] Subquery simple Project [id#0L,(id#0L % cast(3 as bigint)) AS group_id#1L] Subquery ids LogicalRDD [id#0L], MapPartitionsRDD[3] at range at <console>:25 == Optimized Logical Plan == Aggregate [group_id#1L], [group_id#1L,(GeometricMean(cast(id#0L as double)),mode=Complete,isDistinct=false) AS geomean#19] Project [id#0L,(id#0L % 3) AS group_id#1L] LogicalRDD [id#0L], MapPartitionsRDD[3] at range at <console>:25 == Physical Plan == SortBasedAggregate(key=[group_id#1L], functions=[(GeometricMean(cast(id#0L as double)),mode=Final,isDistinct=false)], output=[group_id#1L,geomean#19]) ConvertToSafe TungstenSort [group_id#1L ASC], false, 0 TungstenExchange hashpartitioning(group_id#1L) ConvertToUnsafe SortBasedAggregate(key=[group_id#1L], functions=[(GeometricMean(cast(id#0L as double)),mode=Partial,isDistinct=false)], output=[group_id#1L,count#30L,product#31]) ConvertToSafe TungstenSort [group_id#1L ASC], false, 0 TungstenProject [id#0L,(id#0L % 3) AS group_id#1L] Scan PhysicalRDD[id#0L] Code Generation: true scala> q2.show() +--------+-----------------+ |group_id| geomean| +--------+-----------------+ | 0|8.981385496571725| | 1|7.301716979342118| | 2|7.706253151292568| +--------+-----------------+ {code} And here is the UDAF spec: {code:None|borderStyle=solid} package com.pipeline.spark import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.Row import org.apache.spark.sql.types._ class GeometricMean extends UserDefinedAggregateFunction { // This is the input fields for your aggregate function. def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", DoubleType) :: Nil) // This is the internal fields you keep for computing your aggregate. def bufferSchema: StructType = StructType( StructField("count", LongType) :: StructField("product", DoubleType) :: Nil ) // This is the output type of your aggregatation function. def dataType: DataType = DoubleType def deterministic: Boolean = true // This is the initial value for your buffer schema. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 1.0 } // This is how to update your buffer schema given an input. def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Long](0) + 1 buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0) } // This is how to merge two objects with the bufferSchema type. def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0) buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1) } // This is where you output the final value, given the final value of your bufferSchema. def evaluate(buffer: Row): Any = { math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0)) } } {code} > UDAF result differs in SQL if alias is used > ------------------------------------------- > > Key: SPARK-12491 > URL: https://issues.apache.org/jira/browse/SPARK-12491 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.5.2 > Reporter: Tristan > > Using the GeometricMean UDAF example > (https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html), > I found the following discrepancy in results: > scala> sqlContext.sql("select group_id, gm(id) from simple group by > group_id").show() > +--------+---+ > |group_id|_c1| > +--------+---+ > | 0|0.0| > | 1|0.0| > | 2|0.0| > +--------+---+ > scala> sqlContext.sql("select group_id, gm(id) as GeometricMean from simple > group by group_id").show() > +--------+-----------------+ > |group_id| GeometricMean| > +--------+-----------------+ > | 0|8.981385496571725| > | 1|7.301716979342118| > | 2|7.706253151292568| > +--------+-----------------+ -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org