[ 
https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16342339#comment-16342339
 ] 

Mathieu DESPRIEE edited comment on SPARK-23220 at 1/27/18 10:04 PM:
--------------------------------------------------------------------

Found that the problem is related to the persist() operation. When the 
right-hand side of the join is cached, we have a InMemoryRelation in the plan. 
The sizeInBytes of InMemoryRelation defaults to `spark.sql.defaultSizeInBytes` 
which is Long.MaxValue.
In turn, the canBroadcast() in JoinSelection strategy is false, which prevent 
the BroadcastJoin.

Without the persist(), the plan is completely different :
{noformat}
== Optimized Logical Plan ==
Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- Join LeftAnti, (hostname#44 = hostname#13)
   :- Relation[app_id#40,event#41,event_id#42,timestamp#43,hostname#44] json
   +- ResolvedHint isBroadcastable=true
      +- LocalRelation [hostname#13]

== Physical Plan ==
*Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- *BroadcastHashJoin [hostname#44], [hostname#13], LeftAnti, BuildRight
   :- *FileScan json [app_id#40,event#41,event_id#42,timestamp#43,hostname#44] 
Batched: false, Format: JSON, Location: 
InMemoryFileIndex[file:/tmp/events628533427690545694/data], PartitionFilters: 
[], PushedFilters: [], ReadSchema: 
struct<app_id:string,event:string,event_id:string,timestamp:string,hostname:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]))
      +- LocalTableScan [hostname#13]
{noformat}

The problem of stats in InMemoryRelation is addressed by SPARK-22673. I'll 
check if it fixes this bug as well.


was (Author: mathieude):
Found that the problem is related to the persist() operation. When the 
right-hand side of the join is cached, we have a InMemoryRelation in the plan. 
The sizeInBytes of InMemoryRelation defaults to spark.sql.defaultSizeInBytes 
which is Long.MaxValue.
In turn, the canBroadcast() in JoinSelection strategy is false, which prevent 
the BroadcastJoin.

Without the persist(), the plan is completely different :
{noformat}
== Optimized Logical Plan ==
Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- Join LeftAnti, (hostname#44 = hostname#13)
   :- Relation[app_id#40,event#41,event_id#42,timestamp#43,hostname#44] json
   +- ResolvedHint isBroadcastable=true
      +- LocalRelation [hostname#13]

== Physical Plan ==
*Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- *BroadcastHashJoin [hostname#44], [hostname#13], LeftAnti, BuildRight
   :- *FileScan json [app_id#40,event#41,event_id#42,timestamp#43,hostname#44] 
Batched: false, Format: JSON, Location: 
InMemoryFileIndex[file:/tmp/events628533427690545694/data], PartitionFilters: 
[], PushedFilters: [], ReadSchema: 
struct<app_id:string,event:string,event_id:string,timestamp:string,hostname:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]))
      +- LocalTableScan [hostname#13]
{noformat}

The problem of stats in InMemoryRelation is addressed by SPARK-22673. I'll 
check if it fixes this bug as well.

> broadcast hint not applied in a streaming left anti join
> --------------------------------------------------------
>
>                 Key: SPARK-23220
>                 URL: https://issues.apache.org/jira/browse/SPARK-23220
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.1
>            Reporter: Mathieu DESPRIEE
>            Priority: Major
>         Attachments: Screenshot from 2018-01-25 17-32-45.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, 
> and a static dataframe. This one is quite small (a few 100s of rows), but the 
> query plan by default is a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed 
> the same app with a FileSource pointing to our S3 storage with all archives. 
> In that situation, the first mini-batch is quite heavy (several 100'000s of 
> input files), and the time spent in sort-merge join is non-acceptable. 
> Additionally it's highly skewed, so partition sizes are completely uneven, 
> and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>          +- Join LeftAnti, (hostname#3584 = hostname#190)
>             :- Project [app_id, ...
> <-- snip -->
>                                        +- StreamingExecutionRelation 
> FileStreamSource[s3://xxxx{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], 
> [app_id
>  <--snip--> ... 62 more fields]
>             +- ResolvedHint isBroadcastable=true
>                +- Relation[hostname#190,descr#191] 
> RedshiftRelation("PUBLIC"."hostname_filter")
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to