[ https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341791#comment-16341791 ]
Mathieu DESPRIEE commented on SPARK-23220: ------------------------------------------ Here is a gist https://gist.github.com/mdespriee/6a0c7c36c536d2ec43ac1236863e9f04 to reproduce the problem. The join with broadcast hint is transformed to a SortMergeJoin {noformat} == Parsed Logical Plan == Project [hostname#54, app_id#50, event#51, event_id#52, timestamp#53] +- Join LeftAnti, (hostname#54 = hostname#13) :- Relation[app_id#50,event#51,event_id#52,timestamp#53,hostname#54] json +- ResolvedHint isBroadcastable=true +- LocalRelation [hostname#13, descr#14] == Analyzed Logical Plan == hostname: string, app_id: string, event: string, event_id: string, timestamp: string Project [hostname#54, app_id#50, event#51, event_id#52, timestamp#53] +- Join LeftAnti, (hostname#54 = hostname#13) :- Relation[app_id#50,event#51,event_id#52,timestamp#53,hostname#54] json +- ResolvedHint isBroadcastable=true +- LocalRelation [hostname#13, descr#14] == Optimized Logical Plan == Project [hostname#54, app_id#50, event#51, event_id#52, timestamp#53] +- Join LeftAnti, (hostname#54 = hostname#13) :- Relation[app_id#50,event#51,event_id#52,timestamp#53,hostname#54] json +- Project [hostname#13] +- InMemoryRelation [hostname#13, descr#14], true, 10000, StorageLevel(disk, memory, 1 replicas) +- LocalTableScan [hostname#13, descr#14] == Physical Plan == *Project [hostname#54, app_id#50, event#51, event_id#52, timestamp#53] +- SortMergeJoin [hostname#54], [hostname#13], LeftAnti :- *Sort [hostname#54 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(hostname#54, 10) : +- *FileScan json [app_id#50,event#51,event_id#52,timestamp#53,hostname#54] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/tmp/events3951774258862794179/data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<app_id:string,event:string,event_id:string,timestamp:string,hostname:string> +- *Sort [hostname#13 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(hostname#13, 10) +- InMemoryTableScan [hostname#13] +- InMemoryRelation [hostname#13, descr#14], true, 10000, StorageLevel(disk, memory, 1 replicas) +- LocalTableScan [hostname#13, descr#14] {noformat} > broadcast hint not applied in a streaming left anti join > -------------------------------------------------------- > > Key: SPARK-23220 > URL: https://issues.apache.org/jira/browse/SPARK-23220 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.2.1 > Reporter: Mathieu DESPRIEE > Priority: Major > Attachments: Screenshot from 2018-01-25 17-32-45.png > > > We have a structured streaming app doing a left anti-join between a stream, > and a static dataframe. This one is quite small (a few 100s of rows), but the > query plan by default is a sort merge join. > > It happens sometimes we need to re-process some historical data, so we feed > the same app with a FileSource pointing to our S3 storage with all archives. > In that situation, the first mini-batch is quite heavy (several 100'000s of > input files), and the time spent in sort-merge join is non-acceptable. > Additionally it's highly skewed, so partition sizes are completely uneven, > and executors tend to crash with OOMs. > I tried to switch to a broadcast join, but Spark still applies a sort-merge. > {noformat} > ds.join(broadcast(hostnames), Seq("hostname"), "leftanti") > {noformat} > !Screenshot from 2018-01-25 17-32-45.png! > The logical plan is : > {noformat} > Project [app_id#5203, <--- snip ---> ... 18 more fields] > +- Project ... > <-- snip --> > +- Join LeftAnti, (hostname#3584 = hostname#190) > :- Project [app_id, ... > <-- snip --> > +- StreamingExecutionRelation > FileStreamSource[s3://xxxx{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], > [app_id > <--snip--> ... 62 more fields] > +- ResolvedHint isBroadcastable=true > +- Relation[hostname#190,descr#191] > RedshiftRelation("PUBLIC"."hostname_filter") > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org