[ 
https://issues.apache.org/jira/browse/SPARK-12076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15816938#comment-15816938
 ] 

Hyukjin Kwon edited comment on SPARK-12076 at 1/11/17 2:43 AM:
---------------------------------------------------------------

Would you be able to try this in the current master or Spark 2.1?

It is painful to imagine and generate the data to reproduce this issue with 
such a complex query and even if someone like me makes it to verify, I can't 
say it is correctly reproduced somewhere because strictly it is unknown if the 
data was correct and I believe SQL component has rapidly changed and now it 
might produce other plans. Also, it is even worse because someone like me can't 
sure what is incorrect in the output.

So, I guess we could narrow down the problem so that someone can verify it is a 
problem.

If you are not able to try this in the current master, we should resolve this 
either as {{Cannot Reproduce}} because I guess no one can reproduce this and 
verify it, or {{Not A Problem}} because this "applies to issues or components 
that have changed radically since it was opened".



was (Author: hyukjin.kwon):
Would you be able to try this in Spark 2.1?

It is painful to imagine and generate the data to reproduce this issue with 
such a complex query and even if someone like me makes it to verify, I can't 
say it is correctly reproduced somewhere because strictly it is unknown if the 
data was correct and I believe SQL component has rapidly changed and now it 
might produce other plans. Also, it is even worse because someone like me can't 
sure what is incorrect in the output.

So, I guess we could narrow down the problem so that someone can verify it is a 
problem.

If you are not able to try this in the current master, we should resolve this 
either as {{Cannot Reproduce}} because I guess no one can reproduce this and 
verify it, or {{Not A Problem}} because this "applies to issues or components 
that have changed radically since it was opened".


> countDistinct behaves inconsistently
> ------------------------------------
>
>                 Key: SPARK-12076
>                 URL: https://issues.apache.org/jira/browse/SPARK-12076
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.1
>            Reporter: Paul Zaczkieiwcz
>            Priority: Minor
>
> Assume:
> {code:java}
> val slicePlayed:DataFrame = _
> val joinKeys:DataFrame = _
> {code}
> Also assume that all columns beginning with "cdnt_" are from {{slicePlayed}} 
> and all columns beginning with "join_" are from {{joinKeys}}.  The following 
> queries can return different values for slice_count_distinct:
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
>     $"join_session_id" === $"cdnt_session_id" &&
>     $"join_asset_id" === $"cdnt_asset_id" &&
>     $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number")
> ).show(false)
> {code}
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
>     $"join_session_id" === $"cdnt_session_id" &&
>     $"join_asset_id" === $"cdnt_asset_id" &&
>     $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   min($"cdnt_event_time").as("slice_start_time"),
>   min($"cdnt_playing_owner_id").as("slice_played_playing_owner_id"),
>   min($"cdnt_user_ip").as("slice_played_user_ip"),
>   min($"cdnt_user_agent").as("slice_played_user_agent"),
>   min($"cdnt_referer").as("slice_played_referer"),
>   max($"cdnt_event_time").as("slice_end_time"),
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number"),
>   min($"cdnt_is_live").as("is_live")
> ).show(false)
> {code}
> The +only+ difference between the two queries are that I'm adding more 
> columns to the {{agg}} method.
> I can't reproduce by manually creating a dataFrame from 
> {{DataFrame.parallelize}}. The original sources of the dataFrames are parquet 
> files.
> The explain plans for the two queries are slightly different.
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
> functions=[(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)],
>  
> output=[slice_played_session_id#780,slice_played_asset_id#781,slice_played_euid#782,slice_count_distinct#783L,slice_count_total#784L,min_slice_number#785L,max_slice_number#786L])
>  
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
>  
> functions=[(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false)],
>  
> output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>   
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
>  
> functions=[(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false)],
>  
> output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>    TungstenProject 
> [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L]
>     SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
> [join_session_id#41,join_asset_id#42,join_euid#43]
>      TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 
> ASC], false, 0
>       TungstenExchange 
> hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
>        ConvertToUnsafe
>         Scan 
> ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_slice_number#24L,cdnt_euid#13,cdnt_asset_id#5,cdnt_session_id#23]
>      TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 
> ASC], false, 0
>       TungstenExchange 
> hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
>        ConvertToUnsafe
>         Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
> {code}
> {code}
> == Physical Plan ==
> SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
> functions=[(max(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_is_live#18),mode=Final,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_user_ip#31),mode=Final,isDistinct=false),(min(cdnt_user_agent#30),mode=Final,isDistinct=false),(min(cdnt_referer#22),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)],
>  
> output=[slice_played_session_id#721,slice_played_asset_id#722,slice_played_euid#723,slice_start_time#724,slice_played_playing_owner_id#725,slice_played_user_ip#726,slice_played_user_agent#727,slice_played_referer#728,slice_end_time#729,slice_count_distinct#730L,slice_count_total#731L,min_slice_number#732L,max_slice_number#733L,is_live#734])
>  
> SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
>  
> functions=[(max(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_is_live#18),mode=PartialMerge,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_user_ip#31),mode=PartialMerge,isDistinct=false),(min(cdnt_user_agent#30),mode=PartialMerge,isDistinct=false),(min(cdnt_referer#22),mode=PartialMerge,isDistinct=false)],
>  
> output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
>   
> SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
>  
> functions=[(max(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_is_live#18),mode=Partial,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_user_ip#31),mode=Partial,isDistinct=false),(min(cdnt_user_agent#30),mode=Partial,isDistinct=false),(min(cdnt_referer#22),mode=Partial,isDistinct=false)],
>  
> output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
>    ConvertToSafe
>     TungstenProject 
> [cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
>      SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
> [join_session_id#41,join_asset_id#42,join_euid#43]
>       TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 
> ASC], false, 0
>        TungstenExchange 
> hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
>         ConvertToUnsafe
>          Scan 
> ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
>       TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 
> ASC], false, 0
>        TungstenExchange 
> hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
>         ConvertToUnsafe
>          Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
> {code}
> The biggest difference betwen the two plans is whether TungstenAggregate is 
> used or whether SortBasedAggregate+ConvertToSafe is used. The 
> SortBasedAggregate+ConvertToSafe method gives the inaccurate results. I've 
> been able to get around this issue by adding a {{sortBy}} call before the 
> {{groupBy}} clause, but it strikes me that this particular calculation 
> shouldn't change by adding a manual sort in an intermediate step.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to