[ 
https://issues.apache.org/jira/browse/SPARK-12491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15072902#comment-15072902
 ] 

Tristan commented on SPARK-12491:
---------------------------------

REPL with logical plans:

{code:None|borderStyle=solid}
scala> import com.pipeline.spark._
import com.pipeline.spark._

scala> sqlContext.udf.register("gm", new GeometricMean)
res0: org.apache.spark.sql.expressions.UserDefinedAggregateFunction = 
com.pipeline.spark.GeometricMean@497031ea

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val ids = sqlContext.range(1, 20)
ids: org.apache.spark.sql.DataFrame = [id: bigint]

scala> ids.registerTempTable("ids")

scala> val df = sqlContext.sql("select id, id % 3 as group_id from ids")
df: org.apache.spark.sql.DataFrame = [id: bigint, group_id: bigint]

scala> df.registerTempTable("simple")

scala> val q = sqlContext.sql("select group_id, gm(id) from simple group by 
group_id")
q: org.apache.spark.sql.DataFrame = [group_id: bigint, _c1: double]

scala> q.explain(true)
== Parsed Logical Plan ==
'Aggregate ['group_id], [unresolvedalias('group_id),unresolvedalias('gm('id))]
 'UnresolvedRelation [simple], None

== Analyzed Logical Plan ==
group_id: bigint, _c1: double
Aggregate [group_id#1L], [group_id#1L,(GeometricMean(cast(id#0L as 
double)),mode=Complete,isDistinct=false) AS _c1#12]
 Subquery simple
  Project [id#0L,(id#0L % cast(3 as bigint)) AS group_id#1L]
   Subquery ids
    LogicalRDD [id#0L], MapPartitionsRDD[3] at range at <console>:25

== Optimized Logical Plan ==
Aggregate [group_id#1L], [group_id#1L,(GeometricMean(cast(id#0L as 
double)),mode=Complete,isDistinct=false) AS _c1#12]
 Project [id#0L,(id#0L % 3) AS group_id#1L]
  LogicalRDD [id#0L], MapPartitionsRDD[3] at range at <console>:25

== Physical Plan ==
SortBasedAggregate(key=[group_id#1L], functions=[(GeometricMean(cast(id#0L as 
double)),mode=Final,isDistinct=false)], output=[group_id#1L,_c1#12])
 ConvertToSafe
  TungstenSort [group_id#1L ASC], false, 0
   TungstenExchange hashpartitioning(group_id#1L)
    ConvertToUnsafe
     SortBasedAggregate(key=[group_id#1L], functions=[(GeometricMean(cast(id#0L 
as double)),mode=Partial,isDistinct=false)], 
output=[group_id#1L,count#14L,product#15])
      ConvertToSafe
       TungstenSort [group_id#1L ASC], false, 0
        TungstenProject [id#0L,(id#0L % 3) AS group_id#1L]
         Scan PhysicalRDD[id#0L]

Code Generation: true

scala> q.show()
+--------+---+
|group_id|_c1|
+--------+---+
|       0|0.0|
|       1|0.0|
|       2|0.0|
+--------+---+


scala> val q2 = sqlContext.sql("select group_id, gm(id) as geomean from simple 
group by group_id")
q2: org.apache.spark.sql.DataFrame = [group_id: bigint, geomean: double]

scala> q2.explain(true)
== Parsed Logical Plan ==
'Aggregate ['group_id], [unresolvedalias('group_id),unresolvedalias('gm('id) AS 
geomean#19)]
 'UnresolvedRelation [simple], None

== Analyzed Logical Plan ==
group_id: bigint, geomean: double
Aggregate [group_id#1L], [group_id#1L,(GeometricMean(cast(id#0L as 
double)),mode=Complete,isDistinct=false) AS geomean#19]
 Subquery simple
  Project [id#0L,(id#0L % cast(3 as bigint)) AS group_id#1L]
   Subquery ids
    LogicalRDD [id#0L], MapPartitionsRDD[3] at range at <console>:25

== Optimized Logical Plan ==
Aggregate [group_id#1L], [group_id#1L,(GeometricMean(cast(id#0L as 
double)),mode=Complete,isDistinct=false) AS geomean#19]
 Project [id#0L,(id#0L % 3) AS group_id#1L]
  LogicalRDD [id#0L], MapPartitionsRDD[3] at range at <console>:25

== Physical Plan ==
SortBasedAggregate(key=[group_id#1L], functions=[(GeometricMean(cast(id#0L as 
double)),mode=Final,isDistinct=false)], output=[group_id#1L,geomean#19])
 ConvertToSafe
  TungstenSort [group_id#1L ASC], false, 0
   TungstenExchange hashpartitioning(group_id#1L)
    ConvertToUnsafe
     SortBasedAggregate(key=[group_id#1L], functions=[(GeometricMean(cast(id#0L 
as double)),mode=Partial,isDistinct=false)], 
output=[group_id#1L,count#30L,product#31])
      ConvertToSafe
       TungstenSort [group_id#1L ASC], false, 0
        TungstenProject [id#0L,(id#0L % 3) AS group_id#1L]
         Scan PhysicalRDD[id#0L]

Code Generation: true

 
scala> q2.show()
+--------+-----------------+
|group_id|          geomean|
+--------+-----------------+
|       0|8.981385496571725|
|       1|7.301716979342118|
|       2|7.706253151292568|
+--------+-----------------+
{code}

And here is the UDAF spec:

{code:None|borderStyle=solid}
package com.pipeline.spark

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._


class GeometricMean extends UserDefinedAggregateFunction {
  // This is the input fields for your aggregate function.
  def inputSchema: org.apache.spark.sql.types.StructType = 
    StructType(StructField("value", DoubleType) :: Nil)

  // This is the internal fields you keep for computing your aggregate.
  def bufferSchema: StructType = StructType(
    StructField("count", LongType) ::
    StructField("product", DoubleType) :: Nil
  )
  
  // This is the output type of your aggregatation function.
  def dataType: DataType = DoubleType
  
  def deterministic: Boolean = true
  
  // This is the initial value for your buffer schema.
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 1.0
  }
  
  // This is how to update your buffer schema given an input.
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Long](0) + 1
    buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
  }
 
  // This is how to merge two objects with the bufferSchema type.
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
    buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
  }

  // This is where you output the final value, given the final value of your 
bufferSchema.
  def evaluate(buffer: Row): Any = {
    math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0))
  }
}
{code}

> UDAF result differs in SQL if alias is used
> -------------------------------------------
>
>                 Key: SPARK-12491
>                 URL: https://issues.apache.org/jira/browse/SPARK-12491
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.2
>            Reporter: Tristan
>
> Using the GeometricMean UDAF example 
> (https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html),
>  I found the following discrepancy in results:
> scala> sqlContext.sql("select group_id, gm(id) from simple group by 
> group_id").show()
> +--------+---+
> |group_id|_c1|
> +--------+---+
> |       0|0.0|
> |       1|0.0|
> |       2|0.0|
> +--------+---+
> scala> sqlContext.sql("select group_id, gm(id) as GeometricMean from simple 
> group by group_id").show()
> +--------+-----------------+
> |group_id|    GeometricMean|
> +--------+-----------------+
> |       0|8.981385496571725|
> |       1|7.301716979342118|
> |       2|7.706253151292568|
> +--------+-----------------+



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to