[ 
https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341791#comment-16341791
 ] 

Mathieu DESPRIEE commented on SPARK-23220:
------------------------------------------

Here is a gist 
https://gist.github.com/mdespriee/6a0c7c36c536d2ec43ac1236863e9f04 to reproduce 
the problem.

The join with broadcast hint is transformed to a SortMergeJoin

{noformat}
== Parsed Logical Plan ==
Project [hostname#54, app_id#50, event#51, event_id#52, timestamp#53]
+- Join LeftAnti, (hostname#54 = hostname#13)
   :- Relation[app_id#50,event#51,event_id#52,timestamp#53,hostname#54] json
   +- ResolvedHint isBroadcastable=true
      +- LocalRelation [hostname#13, descr#14]

== Analyzed Logical Plan ==
hostname: string, app_id: string, event: string, event_id: string, timestamp: 
string
Project [hostname#54, app_id#50, event#51, event_id#52, timestamp#53]
+- Join LeftAnti, (hostname#54 = hostname#13)
   :- Relation[app_id#50,event#51,event_id#52,timestamp#53,hostname#54] json
   +- ResolvedHint isBroadcastable=true
      +- LocalRelation [hostname#13, descr#14]

== Optimized Logical Plan ==
Project [hostname#54, app_id#50, event#51, event_id#52, timestamp#53]
+- Join LeftAnti, (hostname#54 = hostname#13)
   :- Relation[app_id#50,event#51,event_id#52,timestamp#53,hostname#54] json
   +- Project [hostname#13]
      +- InMemoryRelation [hostname#13, descr#14], true, 10000, 
StorageLevel(disk, memory, 1 replicas)
            +- LocalTableScan [hostname#13, descr#14]

== Physical Plan ==
*Project [hostname#54, app_id#50, event#51, event_id#52, timestamp#53]
+- SortMergeJoin [hostname#54], [hostname#13], LeftAnti
   :- *Sort [hostname#54 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(hostname#54, 10)
   :     +- *FileScan json 
[app_id#50,event#51,event_id#52,timestamp#53,hostname#54] Batched: false, 
Format: JSON, Location: 
InMemoryFileIndex[file:/tmp/events3951774258862794179/data], PartitionFilters: 
[], PushedFilters: [], ReadSchema: 
struct<app_id:string,event:string,event_id:string,timestamp:string,hostname:string>
   +- *Sort [hostname#13 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(hostname#13, 10)
         +- InMemoryTableScan [hostname#13]
               +- InMemoryRelation [hostname#13, descr#14], true, 10000, 
StorageLevel(disk, memory, 1 replicas)
                     +- LocalTableScan [hostname#13, descr#14]
{noformat}

> broadcast hint not applied in a streaming left anti join
> --------------------------------------------------------
>
>                 Key: SPARK-23220
>                 URL: https://issues.apache.org/jira/browse/SPARK-23220
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.1
>            Reporter: Mathieu DESPRIEE
>            Priority: Major
>         Attachments: Screenshot from 2018-01-25 17-32-45.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, 
> and a static dataframe. This one is quite small (a few 100s of rows), but the 
> query plan by default is a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed 
> the same app with a FileSource pointing to our S3 storage with all archives. 
> In that situation, the first mini-batch is quite heavy (several 100'000s of 
> input files), and the time spent in sort-merge join is non-acceptable. 
> Additionally it's highly skewed, so partition sizes are completely uneven, 
> and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>          +- Join LeftAnti, (hostname#3584 = hostname#190)
>             :- Project [app_id, ...
> <-- snip -->
>                                        +- StreamingExecutionRelation 
> FileStreamSource[s3://xxxx{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], 
> [app_id
>  <--snip--> ... 62 more fields]
>             +- ResolvedHint isBroadcastable=true
>                +- Relation[hostname#190,descr#191] 
> RedshiftRelation("PUBLIC"."hostname_filter")
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to