[ https://issues.apache.org/jira/browse/SPARK-11885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Davies Liu reassigned SPARK-11885: ---------------------------------- Assignee: Davies Liu (was: Yin Huai) > UDAF may nondeterministically generate wrong results > ---------------------------------------------------- > > Key: SPARK-11885 > URL: https://issues.apache.org/jira/browse/SPARK-11885 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.5.2 > Reporter: Yin Huai > Assignee: Davies Liu > Priority: Critical > > I could not reproduce it in 1.6 branch (it can be easily reproduced in 1.5). > I think it is an issue in 1.5 branch. > Try the following in spark 1.5 (with a cluster) and you can see the problem. > {code} > import java.math.BigDecimal > import org.apache.spark.sql.expressions.MutableAggregationBuffer > import org.apache.spark.sql.expressions.UserDefinedAggregateFunction > import org.apache.spark.sql.Row > import org.apache.spark.sql.types.{StructType, StructField, DataType, > DoubleType, LongType} > class GeometricMean extends UserDefinedAggregateFunction { > def inputSchema: StructType = > StructType(StructField("value", DoubleType) :: Nil) > def bufferSchema: StructType = StructType( > StructField("count", LongType) :: > StructField("product", DoubleType) :: Nil > ) > def dataType: DataType = DoubleType > def deterministic: Boolean = true > def initialize(buffer: MutableAggregationBuffer): Unit = { > buffer(0) = 0L > buffer(1) = 1.0 > } > def update(buffer: MutableAggregationBuffer,input: Row): Unit = { > buffer(0) = buffer.getAs[Long](0) + 1 > buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0) > } > def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { > buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0) > buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1) > } > def evaluate(buffer: Row): Any = { > math.pow(buffer.getDouble(1), 1.0d / buffer.getLong(0)) > } > } > sqlContext.udf.register("gm", new GeometricMean) > val df = Seq( > (1, "italy", "emilia", 42, BigDecimal.valueOf(100, 0), "john"), > (2, "italy", "toscana", 42, BigDecimal.valueOf(505, 1), "jim"), > (3, "italy", "puglia", 42, BigDecimal.valueOf(70, 0), "jenn"), > (4, "italy", "emilia", 42, BigDecimal.valueOf(75 ,0), "jack"), > (5, "uk", "london", 42, BigDecimal.valueOf(200 ,0), "carl"), > (6, "italy", "emilia", 42, BigDecimal.valueOf(42, 0), "john")). > toDF("receipt_id", "store_country", "store_region", "store_id", "amount", > "seller_name") > df.registerTempTable("receipts") > > val q = sql(""" > select store_country, > store_region, > avg(amount), > sum(amount), > gm(amount) > from receipts > where amount > 50 > and store_country = 'italy' > group by store_country, store_region > """) > q.show > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org