Does it still plan an inner join if you remove a filter on both tables? It seems like you are asking for a left join, but your filters demand the behavior of an inner join.
Maybe you could do the filters on the tables first and then join them. Something roughly like.. s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29) p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is null) output = s_DF.join(p_DF, event_id == source_event_id, left) On Thu, Apr 30, 2020 at 11:06 AM Roland Johann <roland.joh...@phenetic.io.invalid> wrote: > Hi All, > > > we are on vanilla Spark 2.4.4 and currently experience a somehow strange > behavior of the query planner/optimizer and therefore get wrong results. > > select > s.event_id as search_event_id, > s.query_string, > p.event_id > from s > left outer join p on s.event_id = p.source_event_id > where > s.year = 2020 and s.month = 4 and s.day = 29 > and p.year = 2020 and p.month = 4 and p.day = 29 > limit 1 > > This query leads to that plan: > > *(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, > event_id#12209] > +- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, > BuildLeft > :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true])) > : +- *(1) Project [event_id#12131, query_string#12178] > : +- *(1) Filter isnotnull(event_id#12131) > : +- *(1) FileScan parquet > s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] > Batched: true, Format: Parquet, Location: > PrunedInMemoryFileIndex[hdfs://<some-path>/search/year=2020/month=4/day=29/..., > PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), > isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., > PushedFilters: [IsNotNull(event_id)], ReadSchema: > struct<event_id:string,query_string:string> > +- *(2) Project [event_id#12209, source_event_id#12221] > +- *(2) Filter isnotnull(source_event_id#12221) > +- *(2) FileScan parquet > s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] > Batched: true, Format: Parquet, Location: > PrunedInMemoryFileIndex[hdfs://<some-path>/p/year=2020/month=4/day=2..., > PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), > isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), > (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: > struct<event_id:string,source_event_id:string> > > Without partition pruning the join gets planned as LeftOuter, with > SortMergeJoin but we need partition pruning in this case to prevent full > table scans and profit from broadcast join... > > As soon as we rewrite the query with scala the plan looks fine > > val s = spark.sql("select event_id, query_string from ssi_kpi.search where > year = 2020 and month = 4 and day = 29") > val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show > where year = 2020 and month = 4 and day = 29") > > s > .join(p, s("event_id") <=> p("source_event_id"), "left_outer") > .groupBy(s("query_string")) > .agg(count(s("query_string")), count(p("event_id"))) > .show() > > > > The second thing we saw that conditions at the where clause of joined > tables gets pushed down to the parquet files and lead to wring results, for > example: > > select > s.event_id as search_event_id, > s.query_string, > p.event_id > from s > left outer join p on s.event_id = p.source_event_id > where > s.year = 2020 and s.month = 4 and s.day = 29 > and p.year = 2020 and p.month = 4 and p.day = 29 > and p.event_id is null > > Until now I assumed that the string based queries and the scala dsl lead > to the same execution plan. Can someone point to docs about the internals > of this topic of spark? The official docs about SQL in general are not that > verbose. > > Thanks in advance and stay safe! > > Roland Johann > -- I appreciate your time, ~Randy randyclin...@gmail.com