Nope. Count action did not help to choose broadcast join.

All of my tables are hive external tables. So, I tried to trigger compute 
statistics from sqlContext.sql.  It gives me an error saying “nonsuch table”. I 
am not sure that is due to following bug in 1.4.1

https://issues.apache.org/jira/browse/SPARK-8105 
<https://issues.apache.org/jira/browse/SPARK-8105>

I don’t find a way to enable broadcastHashjoin in my case :(


> On Aug 17, 2015, at 12:52 PM, Silvio Fiorito <silvio.fior...@granturing.com> 
> wrote:
> 
> Try doing a count on both lookups to force the caching to occur before the 
> join.
> 
> 
> 
> 
> On 8/17/15, 12:39 PM, "VIJAYAKUMAR JAWAHARLAL" <sparkh...@data2o.io> wrote:
> 
>> Thanks for your help
>> 
>> I tried to cache the lookup tables and left out join with the big table 
>> (DF). Join does not seem to be using broadcast join-still it goes with hash 
>> partition join and shuffling big table. Here is the scenario
>> 
>> 
>> …
>> table1 as big_df
>> left outer join
>> table2 as lkup
>> on big_df.lkupid = lkup.lkupid
>> 
>> table1 above is well distributed across all 40 partitions because 
>> sqlContext.sql("SET spark.sql.shuffle.partitions=40"). table2 is small, 
>> using just 2 partition.  s. After the join stage, sparkUI showed me that all 
>> activities ended up in  just 2 executors. When I tried to dump the data in 
>> hdfs after join stage, all data ended up in 2 partition files and rest 38 
>> files are 0 sized files.
>> 
>> Since above one did not work, I tried to broadcast DF and registered as 
>> table before join. 
>> 
>> val table2_df = sqlContext.sql("select * from table2")
>> val broadcast_table2 =sc.broadcast(table2_df)
>> broadcast_table2.value.registerTempTable(“table2”)
>> 
>> Broadcast is also having same issue as explained above. All data processed 
>> by just executors due to lookup skew.
>> 
>> Any more idea to tackle this issue in Spark Dataframe?
>> 
>> Thanks
>> Vijay
>> 
>> 
>>> On Aug 14, 2015, at 10:27 AM, Silvio Fiorito 
>>> <silvio.fior...@granturing.com> wrote:
>>> 
>>> You could cache the lookup DataFrames, it’ll then do a broadcast join.
>>> 
>>> 
>>> 
>>> 
>>> On 8/14/15, 9:39 AM, "VIJAYAKUMAR JAWAHARLAL" <sparkh...@data2o.io> wrote:
>>> 
>>>> Hi
>>>> 
>>>> I am facing huge performance problem when I am trying to left outer join 
>>>> very big data set (~140GB) with bunch of small lookups [Start schema 
>>>> type]. I am using data frame  in spark sql. It looks like data is shuffled 
>>>> and skewed when that join happens. Is there any way to improve performance 
>>>> of such type of join in spark? 
>>>> 
>>>> How can I hint optimizer to go with replicated join etc., to avoid 
>>>> shuffle? Would it help to create broadcast variables on small lookups?  If 
>>>> I create broadcast variables, how can I convert them into data frame and 
>>>> use them in sparksql type of join?
>>>> 
>>>> Thanks
>>>> Vijay
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>> 
>> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

Reply via email to