[ 
https://issues.apache.org/jira/browse/SPARK-11885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davies Liu reassigned SPARK-11885:
----------------------------------

    Assignee: Davies Liu  (was: Yin Huai)

> UDAF may nondeterministically generate wrong results
> ----------------------------------------------------
>
>                 Key: SPARK-11885
>                 URL: https://issues.apache.org/jira/browse/SPARK-11885
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.2
>            Reporter: Yin Huai
>            Assignee: Davies Liu
>            Priority: Critical
>
> I could not reproduce it in 1.6 branch (it can be easily reproduced in 1.5). 
> I think it is an issue in 1.5 branch.
> Try the following in spark 1.5 (with a cluster) and you can see the problem.
> {code}
> import java.math.BigDecimal
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StructType, StructField, DataType, 
> DoubleType, LongType}
> class GeometricMean extends UserDefinedAggregateFunction {
>   def inputSchema: StructType =
>     StructType(StructField("value", DoubleType) :: Nil)
>   def bufferSchema: StructType = StructType(
>     StructField("count", LongType) ::
>       StructField("product", DoubleType) :: Nil
>   )
>   def dataType: DataType = DoubleType
>   def deterministic: Boolean = true
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
>     buffer(0) = 0L
>     buffer(1) = 1.0
>   }
>   def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
>     buffer(0) = buffer.getAs[Long](0) + 1
>     buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
>   }
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>     buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
>     buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
>   }
>   def evaluate(buffer: Row): Any = {
>     math.pow(buffer.getDouble(1), 1.0d / buffer.getLong(0))
>   }
> }
> sqlContext.udf.register("gm", new GeometricMean)
> val df = Seq(
>   (1, "italy", "emilia", 42, BigDecimal.valueOf(100, 0), "john"),
>   (2, "italy", "toscana", 42, BigDecimal.valueOf(505, 1), "jim"),
>   (3, "italy", "puglia", 42, BigDecimal.valueOf(70, 0), "jenn"),
>   (4, "italy", "emilia", 42, BigDecimal.valueOf(75 ,0), "jack"),
>   (5, "uk", "london", 42, BigDecimal.valueOf(200 ,0), "carl"),
>   (6, "italy", "emilia", 42, BigDecimal.valueOf(42, 0), "john")).
>   toDF("receipt_id", "store_country", "store_region", "store_id", "amount", 
> "seller_name")
> df.registerTempTable("receipts")
>   
> val q = sql("""
> select   store_country,
>          store_region,
>          avg(amount),
>          sum(amount),
>          gm(amount)
> from     receipts
> where    amount > 50
>          and store_country = 'italy'
> group by store_country, store_region
> """)
> q.show
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to