[ https://issues.apache.org/jira/browse/SPARK-19032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15796603#comment-15796603 ]
Harry Weppner commented on SPARK-19032: --------------------------------------- [~srowen] thanks for clarifying the intended semantics. I'm having a hard time thinking about valid scenarios where a `first` or `last` aggregation function would yield any deterministic results (in general)!? > Non-deterministic results using aggregation first across multiple workers > ------------------------------------------------------------------------- > > Key: SPARK-19032 > URL: https://issues.apache.org/jira/browse/SPARK-19032 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 1.6.1 > Environment: Standalone Spark 1.6.1 cluster on EC2 with 2 worker > nodes, one executor each. > Reporter: Harry Weppner > > We've come across a situation results aggregated using {{first}} on a sorted > df are non-deterministic. Given the explanation for the plan there appears to > be a plausible explanation but creates more question on the usefulness of > these aggregation functions in a spark cluster. > Here's a minimal example to reproduce: > {code} > val df = > sc.parallelize(Seq(("a","prod1",0.6),("a","prod2",0.4),("a","prod2",0.4),("a","prod2",0.4),("a","prod2",0.4))).toDF("account","product","probability") > var p = > df.sort($"probability".desc).groupBy($"account").agg(first($"product"),first($"probability")).show(); > +-------+----------------+--------------------+ > |account|first(product)()|first(probability)()| > +-------+----------------+--------------------+ > | a| prod1| 0.6| > +-------+----------------+--------------------+ > p: Unit = () > // Repeat and notice that result will occasionally be different > +-------+----------------+--------------------+ > |account|first(product)()|first(probability)()| > +-------+----------------+--------------------+ > | a| prod2| 0.4| > +-------+----------------+--------------------+ > p: Unit = () > scala> > df.sort($"probability".desc).groupBy($"account").agg(first($"product"),first($"probability")).explain(true); > == Parsed Logical Plan == > 'Aggregate ['account], > [unresolvedalias('account),(first('product)(),mode=Complete,isDistinct=false) > AS > first(product)()#523,(first('probability)(),mode=Complete,isDistinct=false) > AS first(probability)()#524] > +- Sort [probability#5 DESC], true > +- Project [_1#0 AS account#3,_2#1 AS product#4,_3#2 AS probability#5] > +- LogicalRDD [_1#0,_2#1,_3#2], MapPartitionsRDD[1] at > rddToDataFrameHolder at <console>:27 > == Analyzed Logical Plan == > account: string, first(product)(): string, first(probability)(): double > Aggregate [account#3], > [account#3,(first(product#4)(),mode=Complete,isDistinct=false) AS > first(product)()#523,(first(probability#5)(),mode=Complete,isDistinct=false) > AS first(probability)()#524] > +- Sort [probability#5 DESC], true > +- Project [_1#0 AS account#3,_2#1 AS product#4,_3#2 AS probability#5] > +- LogicalRDD [_1#0,_2#1,_3#2], MapPartitionsRDD[1] at > rddToDataFrameHolder at <console>:27 > == Optimized Logical Plan == > Aggregate [account#3], > [account#3,(first(product#4)(),mode=Complete,isDistinct=false) AS > first(product)()#523,(first(probability#5)(),mode=Complete,isDistinct=false) > AS first(probability)()#524] > +- Sort [probability#5 DESC], true > +- Project [_1#0 AS account#3,_2#1 AS product#4,_3#2 AS probability#5] > +- LogicalRDD [_1#0,_2#1,_3#2], MapPartitionsRDD[1] at > rddToDataFrameHolder at <console>:27 > == Physical Plan == > SortBasedAggregate(key=[account#3], > functions=[(first(product#4)(),mode=Final,isDistinct=false),(first(probability#5)(),mode=Final,isDistinct=false)], > output=[account#3,first(product)()#523,first(probability)()#524]) > +- ConvertToSafe > +- Sort [account#3 ASC], false, 0 > +- TungstenExchange hashpartitioning(account#3,200), None > +- ConvertToUnsafe > +- SortBasedAggregate(key=[account#3], > functions=[(first(product#4)(),mode=Partial,isDistinct=false),(first(probability#5)(),mode=Partial,isDistinct=false)], > output=[account#3,first#532,valueSet#533,first#534,valueSet#535]) > +- ConvertToSafe > +- Sort [account#3 ASC], false, 0 > +- Sort [probability#5 DESC], true, 0 > +- ConvertToUnsafe > +- Exchange rangepartitioning(probability#5 > DESC,200), None > +- ConvertToSafe > +- Project [_1#0 AS account#3,_2#1 AS > product#4,_3#2 AS probability#5] > +- Scan ExistingRDD[_1#0,_2#1,_3#2] > {code} > My working hypothesis is that after {{TungstenExchange hashpartitioning}} the > _global_ sort order on {{probability}} is lost leading to non-deterministic > results. > If this hypothesis is valid, then how useful are aggregation functions such > as {{first}}, {{last}} and possibly others in Spark? > It appears that the use of window functions could address the ambiguity by > making the partitions explicit but I'd be interested in your assessment. > Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org