Does it still plan an inner join if you remove a filter on both tables?

It seems like you are asking for a left join, but your filters demand the
behavior of an inner join.

Maybe you could do the filters on the tables first and then join them.

Something roughly like..

s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is
null)

output = s_DF.join(p_DF, event_id == source_event_id, left)



On Thu, Apr 30, 2020 at 11:06 AM Roland Johann
<roland.joh...@phenetic.io.invalid> wrote:

> Hi All,
>
>
> we are on vanilla Spark 2.4.4 and currently experience a somehow strange
> behavior of the query planner/optimizer and therefore get wrong results.
>
> select
>     s.event_id as search_event_id,
>     s.query_string,
>     p.event_id
> from s
> left outer join p on s.event_id = p.source_event_id
> where
>     s.year = 2020 and s.month = 4 and s.day = 29
>     and p.year = 2020 and p.month = 4 and p.day = 29
> limit 1
>
> This query leads to that plan:
>
> *(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, 
> event_id#12209]
> +- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, 
> BuildLeft
>    :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
> true]))
>    :  +- *(1) Project [event_id#12131, query_string#12178]
>    :     +- *(1) Filter isnotnull(event_id#12131)
>    :        +- *(1) FileScan parquet 
> s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] 
> Batched: true, Format: Parquet, Location: 
> PrunedInMemoryFileIndex[hdfs://<some-path>/search/year=2020/month=4/day=29/...,
>  PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), 
> isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., 
> PushedFilters: [IsNotNull(event_id)], ReadSchema: 
> struct<event_id:string,query_string:string>
>    +- *(2) Project [event_id#12209, source_event_id#12221]
>       +- *(2) Filter isnotnull(source_event_id#12221)
>          +- *(2) FileScan parquet 
> s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] 
> Batched: true, Format: Parquet, Location: 
> PrunedInMemoryFileIndex[hdfs://<some-path>/p/year=2020/month=4/day=2..., 
> PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), 
> isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), 
> (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: 
> struct<event_id:string,source_event_id:string>
>
> Without partition pruning the join gets planned as LeftOuter, with
> SortMergeJoin but we need partition pruning in this case to prevent full
> table scans and profit from broadcast join...
>
> As soon as we rewrite the query with scala the plan looks fine
>
> val s = spark.sql("select event_id, query_string from ssi_kpi.search where 
> year = 2020 and month = 4 and day = 29")
> val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show 
> where year = 2020 and month = 4 and day = 29")
>
> s
>   .join(p, s("event_id") <=> p("source_event_id"), "left_outer")
>   .groupBy(s("query_string"))
>   .agg(count(s("query_string")), count(p("event_id")))
>   .show()
>
>
>
> The second thing we saw that conditions at the where clause of joined
> tables gets pushed down to the parquet files and lead to wring results, for
> example:
>
> select
>     s.event_id as search_event_id,
>     s.query_string,
>     p.event_id
> from s
> left outer join p on s.event_id = p.source_event_id
> where
>     s.year = 2020 and s.month = 4 and s.day = 29
>     and p.year = 2020 and p.month = 4 and p.day = 29
>     and p.event_id is null
>
> Until now I assumed that the string based queries and the scala dsl lead
> to the same execution plan. Can someone point to docs about the internals
> of this topic of spark? The official docs about SQL in general are not that
> verbose.
>
> Thanks in advance and stay safe!
>
> Roland Johann
>


-- 
I appreciate your time,

~Randy

randyclin...@gmail.com

Reply via email to