[jira] [Commented] (SPARK-17859) persist should not impede with spark's ability to perform a broadcast join.

2019-04-30 Thread colin fang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830504#comment-16830504
 ] 

colin fang commented on SPARK-17859:


The above case works for me in v2.4
{code:java}
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
df_large = spark.range(1e6)
df_small = F.broadcast(spark.range(10).coalesce(1)).cache()
df_large.join(df_small, "id").explain()


== Physical Plan ==
*(2) Project [id#0L]
+- *(2) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight
   :- *(2) Range (0, 100, step=1, splits=4)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
false]))
  +- *(1) InMemoryTableScan [id#2L]
+- InMemoryRelation [id#2L], StorageLevel(disk, memory, 
deserialized, 1 replicas)
  +- Coalesce 1
 +- *(1) Range (0, 10, step=1, splits=4)
{code}
However, I have definitely seen cases where `F.broadcast` is ignored for cached 
dataframe. (I am unable to find a minimal example though.)

> persist should not impede with spark's ability to perform a broadcast join.
> ---
>
> Key: SPARK-17859
> URL: https://issues.apache.org/jira/browse/SPARK-17859
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.0.0
> Environment: spark 2.0.0 , Linux RedHat
>Reporter: Franck Tago
>Priority: Major
>
> I am using Spark 2.0.0 
> My investigation leads me to conclude that calling persist could prevent 
> broadcast join  from happening .
> Example
> Case1: No persist call 
> var  df1 =spark.range(100).select($"id".as("id1"))
> df1: org.apache.spark.sql.DataFrame = [id1: bigint]
>  var df2 =spark.range(1000).select($"id".as("id2"))
> df2: org.apache.spark.sql.DataFrame = [id2: bigint]
>  df1.join(df2 , $"id1" === $"id2" ).explain 
> == Physical Plan ==
> *BroadcastHashJoin [id1#117L], [id2#123L], Inner, BuildRight
> :- *Project [id#114L AS id1#117L]
> :  +- *Range (0, 100, splits=2)
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]))
>+- *Project [id#120L AS id2#123L]
>   +- *Range (0, 1000, splits=2)
> Case 2:  persist call 
>  df1.persist.join(df2 , $"id1" === $"id2" ).explain 
> 16/10/10 15:50:21 WARN CacheManager: Asked to cache already cached data.
> == Physical Plan ==
> *SortMergeJoin [id1#3L], [id2#9L], Inner
> :- *Sort [id1#3L ASC], false, 0
> :  +- Exchange hashpartitioning(id1#3L, 10)
> : +- InMemoryTableScan [id1#3L]
> ::  +- InMemoryRelation [id1#3L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> :: :  +- *Project [id#0L AS id1#3L]
> :: : +- *Range (0, 100, splits=2)
> +- *Sort [id2#9L ASC], false, 0
>+- Exchange hashpartitioning(id2#9L, 10)
>   +- InMemoryTableScan [id2#9L]
>  :  +- InMemoryRelation [id2#9L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>  : :  +- *Project [id#6L AS id2#9L]
>  : : +- *Range (0, 1000, splits=2)
> Why does the persist call prevent the broadcast join . 
> My opinion is that it should not .
> I was made aware that the persist call is  lazy and that might have something 
> to do with it , but I still contend that it should not . 
> Losing broadcast joins is really costly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17859) persist should not impede with spark's ability to perform a broadcast join.

2016-12-08 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15733247#comment-15733247
 ] 

Andrew Ray commented on SPARK-17859:


this appears to be fixed in 2.0.2

{code}
scala> df1.persist.join(df2 , $"id1" === $"id2" ).explain
== Physical Plan ==
*BroadcastHashJoin [id1#3L], [id2#9L], Inner, BuildRight
:- InMemoryTableScan [id1#3L]
:  :  +- InMemoryRelation [id1#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
:  : :  +- *Project [id#0L AS id1#3L]
:  : : +- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Project [id#6L AS id2#9L]
  +- *Range (0, 1000, step=1, splits=Some(8))
{code}

> persist should not impede with spark's ability to perform a broadcast join.
> ---
>
> Key: SPARK-17859
> URL: https://issues.apache.org/jira/browse/SPARK-17859
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.0.0
> Environment: spark 2.0.0 , Linux RedHat
>Reporter: Franck Tago
>
> I am using Spark 2.0.0 
> My investigation leads me to conclude that calling persist could prevent 
> broadcast join  from happening .
> Example
> Case1: No persist call 
> var  df1 =spark.range(100).select($"id".as("id1"))
> df1: org.apache.spark.sql.DataFrame = [id1: bigint]
>  var df2 =spark.range(1000).select($"id".as("id2"))
> df2: org.apache.spark.sql.DataFrame = [id2: bigint]
>  df1.join(df2 , $"id1" === $"id2" ).explain 
> == Physical Plan ==
> *BroadcastHashJoin [id1#117L], [id2#123L], Inner, BuildRight
> :- *Project [id#114L AS id1#117L]
> :  +- *Range (0, 100, splits=2)
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]))
>+- *Project [id#120L AS id2#123L]
>   +- *Range (0, 1000, splits=2)
> Case 2:  persist call 
>  df1.persist.join(df2 , $"id1" === $"id2" ).explain 
> 16/10/10 15:50:21 WARN CacheManager: Asked to cache already cached data.
> == Physical Plan ==
> *SortMergeJoin [id1#3L], [id2#9L], Inner
> :- *Sort [id1#3L ASC], false, 0
> :  +- Exchange hashpartitioning(id1#3L, 10)
> : +- InMemoryTableScan [id1#3L]
> ::  +- InMemoryRelation [id1#3L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> :: :  +- *Project [id#0L AS id1#3L]
> :: : +- *Range (0, 100, splits=2)
> +- *Sort [id2#9L ASC], false, 0
>+- Exchange hashpartitioning(id2#9L, 10)
>   +- InMemoryTableScan [id2#9L]
>  :  +- InMemoryRelation [id2#9L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>  : :  +- *Project [id#6L AS id2#9L]
>  : : +- *Range (0, 1000, splits=2)
> Why does the persist call prevent the broadcast join . 
> My opinion is that it should not .
> I was made aware that the persist call is  lazy and that might have something 
> to do with it , but I still contend that it should not . 
> Losing broadcast joins is really costly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org