[ https://issues.apache.org/jira/browse/SPARK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fernando Pereira reopened SPARK-17859: -------------------------------------- This bug persists {code:java} SPARK version 2.2.1 SparkSession available as 'spark'. In [1]: df_large = spark.range(1e6) In [2]: spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0) In [3]: df_small = spark.range(10) In [5]: from pyspark.sql.functions import broadcast In [6]: df_small = broadcast(spark.range(10).coalesce(1)).cache() In [7]: df_large.join(df_small, "id").explain() == Physical Plan == *Project [id#0L] +- *SortMergeJoin [id#0L], [id#6L], Inner :- *Sort [id#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#0L, 200) : +- *Range (0, 1000000, step=1, splits=4) +- *Sort [id#6L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#6L, 200) +- InMemoryTableScan [id#6L] +- InMemoryRelation [id#6L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- Coalesce 1 +- *Range (0, 10, step=1, splits=4) In [8]: df_large.join(df_small.unpersist(), "id").explain() == Physical Plan == *Project [id#0L] +- *BroadcastHashJoin [id#0L], [id#6L], Inner, BuildRight :- *Range (0, 1000000, step=1, splits=4) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- Coalesce 1 +- *Range (0, 10, step=1, splits=4) {code} > persist should not impede with spark's ability to perform a broadcast join. > --------------------------------------------------------------------------- > > Key: SPARK-17859 > URL: https://issues.apache.org/jira/browse/SPARK-17859 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 2.0.0 > Environment: spark 2.0.0 , Linux RedHat > Reporter: Franck Tago > Priority: Major > Fix For: 2.0.2 > > > I am using Spark 2.0.0 > My investigation leads me to conclude that calling persist could prevent > broadcast join from happening . > Example > Case1: No persist call > var df1 =spark.range(1000000).select($"id".as("id1")) > df1: org.apache.spark.sql.DataFrame = [id1: bigint] > var df2 =spark.range(1000).select($"id".as("id2")) > df2: org.apache.spark.sql.DataFrame = [id2: bigint] > df1.join(df2 , $"id1" === $"id2" ).explain > == Physical Plan == > *BroadcastHashJoin [id1#117L], [id2#123L], Inner, BuildRight > :- *Project [id#114L AS id1#117L] > : +- *Range (0, 1000000, splits=2) > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false])) > +- *Project [id#120L AS id2#123L] > +- *Range (0, 1000, splits=2) > Case 2: persist call > df1.persist.join(df2 , $"id1" === $"id2" ).explain > 16/10/10 15:50:21 WARN CacheManager: Asked to cache already cached data. > == Physical Plan == > *SortMergeJoin [id1#3L], [id2#9L], Inner > :- *Sort [id1#3L ASC], false, 0 > : +- Exchange hashpartitioning(id1#3L, 10) > : +- InMemoryTableScan [id1#3L] > : : +- InMemoryRelation [id1#3L], true, 10000, StorageLevel(disk, > memory, deserialized, 1 replicas) > : : : +- *Project [id#0L AS id1#3L] > : : : +- *Range (0, 1000000, splits=2) > +- *Sort [id2#9L ASC], false, 0 > +- Exchange hashpartitioning(id2#9L, 10) > +- InMemoryTableScan [id2#9L] > : +- InMemoryRelation [id2#9L], true, 10000, StorageLevel(disk, > memory, deserialized, 1 replicas) > : : +- *Project [id#6L AS id2#9L] > : : +- *Range (0, 1000, splits=2) > Why does the persist call prevent the broadcast join . > My opinion is that it should not . > I was made aware that the persist call is lazy and that might have something > to do with it , but I still contend that it should not . > Losing broadcast joins is really costly. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org