[ 
https://issues.apache.org/jira/browse/SPARK-19032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15786883#comment-15786883
 ] 

Liang-Chi Hsieh edited comment on SPARK-19032 at 12/30/16 4:50 AM:
-------------------------------------------------------------------

I think you can not guarantee the sort order per group in an aggregation under 
the current API.

One workaround is the combination of repartition + sortWithinPartitions as I 
mentioned in the discussion.

{code}
df.repartition($"account").sortWithinPartitions($"account", 
$"probability".desc).groupBy($"account").agg(first($"product"),first($"probability"))
{code}

It should work. But this is still not guaranteed by the API. If the internal 
implementation of aggregation is changed, then it can't guarantee deterministic 
results again.


was (Author: viirya):
I think you can not guarantee the sort order per group in an aggregation under 
the current API.

One workaround is the combination of repartition + sortWithinPartitions as I 
mentioned in the discussion.

df.repartition($"account").sortWithinPartitions($"account", 
$"probability".desc).groupBy($"account").agg(first($"product"),first($"probability"))

It should work. But this is still not guaranteed by the API. If the internal 
implementation of aggregation is changed, then it can't guarantee deterministic 
results again.

> Non-deterministic results using aggregation first across multiple workers
> -------------------------------------------------------------------------
>
>                 Key: SPARK-19032
>                 URL: https://issues.apache.org/jira/browse/SPARK-19032
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 1.6.1
>         Environment: Standalone Spark 1.6.1 cluster on EC2 with 2 worker 
> nodes, one executor each.
>            Reporter: Harry Weppner
>
> We've come across a situation results aggregated using {{first}} on a sorted 
> df are non-deterministic. Given the explanation for the plan there appears to 
> be a plausible explanation but creates more question on the usefulness of 
> these aggregation functions in a spark cluster.
> Here's a minimal example to reproduce:
> {code}
> val df = 
> sc.parallelize(Seq(("a","prod1",0.6),("a","prod2",0.4),("a","prod2",0.4),("a","prod2",0.4),("a","prod2",0.4))).toDF("account","product","probability")
> var p = 
> df.sort($"probability".desc).groupBy($"account").agg(first($"product"),first($"probability")).show();
> +-------+----------------+--------------------+
> |account|first(product)()|first(probability)()|
> +-------+----------------+--------------------+
> |      a|           prod1|                 0.6|
> +-------+----------------+--------------------+
> p: Unit = ()
> // Repeat and notice that result will occasionally be different
> +-------+----------------+--------------------+
> |account|first(product)()|first(probability)()|
> +-------+----------------+--------------------+
> |      a|           prod2|                 0.4|
> +-------+----------------+--------------------+
> p: Unit = ()
> scala> 
> df.sort($"probability".desc).groupBy($"account").agg(first($"product"),first($"probability")).explain(true);
> == Parsed Logical Plan ==
> 'Aggregate ['account], 
> [unresolvedalias('account),(first('product)(),mode=Complete,isDistinct=false) 
> AS 
> first(product)()#523,(first('probability)(),mode=Complete,isDistinct=false) 
> AS first(probability)()#524]
> +- Sort [probability#5 DESC], true
>    +- Project [_1#0 AS account#3,_2#1 AS product#4,_3#2 AS probability#5]
>       +- LogicalRDD [_1#0,_2#1,_3#2], MapPartitionsRDD[1] at 
> rddToDataFrameHolder at <console>:27
> == Analyzed Logical Plan ==
> account: string, first(product)(): string, first(probability)(): double
> Aggregate [account#3], 
> [account#3,(first(product#4)(),mode=Complete,isDistinct=false) AS 
> first(product)()#523,(first(probability#5)(),mode=Complete,isDistinct=false) 
> AS first(probability)()#524]
> +- Sort [probability#5 DESC], true
>    +- Project [_1#0 AS account#3,_2#1 AS product#4,_3#2 AS probability#5]
>       +- LogicalRDD [_1#0,_2#1,_3#2], MapPartitionsRDD[1] at 
> rddToDataFrameHolder at <console>:27
> == Optimized Logical Plan ==
> Aggregate [account#3], 
> [account#3,(first(product#4)(),mode=Complete,isDistinct=false) AS 
> first(product)()#523,(first(probability#5)(),mode=Complete,isDistinct=false) 
> AS first(probability)()#524]
> +- Sort [probability#5 DESC], true
>    +- Project [_1#0 AS account#3,_2#1 AS product#4,_3#2 AS probability#5]
>       +- LogicalRDD [_1#0,_2#1,_3#2], MapPartitionsRDD[1] at 
> rddToDataFrameHolder at <console>:27
> == Physical Plan ==
> SortBasedAggregate(key=[account#3], 
> functions=[(first(product#4)(),mode=Final,isDistinct=false),(first(probability#5)(),mode=Final,isDistinct=false)],
>  output=[account#3,first(product)()#523,first(probability)()#524])
> +- ConvertToSafe
>    +- Sort [account#3 ASC], false, 0
>       +- TungstenExchange hashpartitioning(account#3,200), None
>          +- ConvertToUnsafe
>             +- SortBasedAggregate(key=[account#3], 
> functions=[(first(product#4)(),mode=Partial,isDistinct=false),(first(probability#5)(),mode=Partial,isDistinct=false)],
>  output=[account#3,first#532,valueSet#533,first#534,valueSet#535])
>                +- ConvertToSafe
>                   +- Sort [account#3 ASC], false, 0
>                      +- Sort [probability#5 DESC], true, 0
>                         +- ConvertToUnsafe
>                            +- Exchange rangepartitioning(probability#5 
> DESC,200), None
>                               +- ConvertToSafe
>                                  +- Project [_1#0 AS account#3,_2#1 AS 
> product#4,_3#2 AS probability#5]
>                                     +- Scan ExistingRDD[_1#0,_2#1,_3#2]
> {code}
> My working hypothesis is that after {{TungstenExchange hashpartitioning}} the 
>  _global_ sort order on {{probability}} is lost leading to non-deterministic 
> results.
> If this hypothesis is valid, then how useful are aggregation functions such 
> as {{first}}, {{last}} and possibly others in Spark?
> It appears that the use of window functions could address the ambiguity by 
> making the partitions explicit but I'd be interested in your assessment. 
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to