[ https://issues.apache.org/jira/browse/SPARK-12076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh Rosen updated SPARK-12076: ------------------------------- Component/s: (was: Spark Core) SQL > countDistinct behaves inconsistently > ------------------------------------ > > Key: SPARK-12076 > URL: https://issues.apache.org/jira/browse/SPARK-12076 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.5.1 > Reporter: Paul Zaczkieiwcz > Priority: Minor > > Assume: > {code:java} > val slicePlayed:DataFrame = _ > val joinKeys:DataFrame = _ > {code} > Also assume that all columns beginning with "cdnt_" are from {{slicePlayed}} > and all columns beginning with "join_" are from {{joinKeys}}. The following > queries can return different values for slice_count_distinct: > {code:java} > slicePlayed.join( > joinKeys, > ( > $"join_session_id" === $"cdnt_session_id" && > $"join_asset_id" === $"cdnt_asset_id" && > $"join_euid" === $"cdnt_euid" > ), > "inner" > ).groupBy( > $"cdnt_session_id".as("slice_played_session_id"), > $"cdnt_asset_id".as("slice_played_asset_id"), > $"cdnt_euid".as("slice_played_euid") > ).agg( > countDistinct($"cdnt_slice_number").as("slice_count_distinct"), > count($"cdnt_slice_number").as("slice_count_total"), > min($"cdnt_slice_number").as("min_slice_number"), > max($"cdnt_slice_number").as("max_slice_number") > ).show(false) > {code} > {code:java} > slicePlayed.join( > joinKeys, > ( > $"join_session_id" === $"cdnt_session_id" && > $"join_asset_id" === $"cdnt_asset_id" && > $"join_euid" === $"cdnt_euid" > ), > "inner" > ).groupBy( > $"cdnt_session_id".as("slice_played_session_id"), > $"cdnt_asset_id".as("slice_played_asset_id"), > $"cdnt_euid".as("slice_played_euid") > ).agg( > min($"cdnt_event_time").as("slice_start_time"), > min($"cdnt_playing_owner_id").as("slice_played_playing_owner_id"), > min($"cdnt_user_ip").as("slice_played_user_ip"), > min($"cdnt_user_agent").as("slice_played_user_agent"), > min($"cdnt_referer").as("slice_played_referer"), > max($"cdnt_event_time").as("slice_end_time"), > countDistinct($"cdnt_slice_number").as("slice_count_distinct"), > count($"cdnt_slice_number").as("slice_count_total"), > min($"cdnt_slice_number").as("min_slice_number"), > max($"cdnt_slice_number").as("max_slice_number"), > min($"cdnt_is_live").as("is_live") > ).show(false) > {code} > The +only+ difference between the two queries are that I'm adding more > columns to the {{agg}} method. > I can't reproduce by manually creating a dataFrame from > {{DataFrame.parallelize}}. The original sources of the dataFrames are parquet > files. > The explain plans for the two queries are slightly different. > {code} > == Physical Plan == > TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], > functions=[(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)], > > output=[slice_played_session_id#780,slice_played_asset_id#781,slice_played_euid#782,slice_count_distinct#783L,slice_count_total#784L,min_slice_number#785L,max_slice_number#786L]) > > TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], > > functions=[(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false)], > > output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L]) > > TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], > > functions=[(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false)], > > output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L]) > TungstenProject > [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L] > SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], > [join_session_id#41,join_asset_id#42,join_euid#43] > TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 > ASC], false, 0 > TungstenExchange > hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13) > ConvertToUnsafe > Scan > ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_slice_number#24L,cdnt_euid#13,cdnt_asset_id#5,cdnt_session_id#23] > TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 > ASC], false, 0 > TungstenExchange > hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43) > ConvertToUnsafe > Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43] > {code} > {code} > == Physical Plan == > SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], > functions=[(max(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_is_live#18),mode=Final,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_user_ip#31),mode=Final,isDistinct=false),(min(cdnt_user_agent#30),mode=Final,isDistinct=false),(min(cdnt_referer#22),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)], > > output=[slice_played_session_id#721,slice_played_asset_id#722,slice_played_euid#723,slice_start_time#724,slice_played_playing_owner_id#725,slice_played_user_ip#726,slice_played_user_agent#727,slice_played_referer#728,slice_end_time#729,slice_count_distinct#730L,slice_count_total#731L,min_slice_number#732L,max_slice_number#733L,is_live#734]) > > SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], > > functions=[(max(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_is_live#18),mode=PartialMerge,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_user_ip#31),mode=PartialMerge,isDistinct=false),(min(cdnt_user_agent#30),mode=PartialMerge,isDistinct=false),(min(cdnt_referer#22),mode=PartialMerge,isDistinct=false)], > > output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756]) > > SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], > > functions=[(max(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_is_live#18),mode=Partial,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_user_ip#31),mode=Partial,isDistinct=false),(min(cdnt_user_agent#30),mode=Partial,isDistinct=false),(min(cdnt_referer#22),mode=Partial,isDistinct=false)], > > output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756]) > ConvertToSafe > TungstenProject > [cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5] > SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], > [join_session_id#41,join_asset_id#42,join_euid#43] > TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 > ASC], false, 0 > TungstenExchange > hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13) > ConvertToUnsafe > Scan > ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5] > TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 > ASC], false, 0 > TungstenExchange > hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43) > ConvertToUnsafe > Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43] > {code} > The biggest difference betwen the two plans is whether TungstenAggregate is > used or whether SortBasedAggregate+ConvertToSafe is used. The > SortBasedAggregate+ConvertToSafe method gives the inaccurate results. I've > been able to get around this issue by adding a {{sortBy}} call before the > {{groupBy}} clause, but it strikes me that this particular calculation > shouldn't change by adding a manual sort in an intermediate step. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org