Hello Everyone, Thanks a lot for the help. We also managed to solve it but without resorting to spark 1.6.
The problem we were having was because of a really bad join condition: ON ((a.col1 = b.col1) or (a.col1 is null and b.col1 is null)) AND ((a.col2 = b.col2) or (a.col2 is null and b.col2 is null)) So what we did was re-work our logic to remove the null checks in the join condition and the join went lightning fast afterwards :) On Feb 22, 2016 21:24, "Dave Moyers" <davemoy...@icloud.com> wrote: > Good article! Thanks for sharing! > > > > On Feb 22, 2016, at 11:10 AM, Davies Liu <dav...@databricks.com> wrote: > > > > This link may help: > > > https://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html > > > > Spark 1.6 had improved the CatesianProduct, you should turn of auto > > broadcast and go with CatesianProduct in 1.6 > > > > On Mon, Feb 22, 2016 at 1:45 AM, Mohannad Ali <man...@gmail.com> wrote: > >> Hello everyone, > >> > >> I'm working with Tamara and I wanted to give you guys an update on the > >> issue: > >> > >> 1. Here is the output of .explain(): > >>> > >>> Project > >>> > [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L > >>> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS > >>> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS > >>> new_gender#42,fk_created_at_date#32 AS > >>> new_fk_created_at_date#43,age_range#30 AS > new_age_range#44,first_name#27 AS > >>> new_first_name#45,last_name#28 AS new_last_name#46] > >>> BroadcastNestedLoopJoin BuildLeft, LeftOuter, Some((((customer_id#1L = > >>> customer_id#25L) || (isnull(customer_id#1L) && > isnull(customer_id#25L))) && > >>> ((country#2 = country#24) || (isnull(country#2) && > isnull(country#24))))) > >>> Scan > >>> > PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32] > >>> Scan > >>> > ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L] > >> > >> > >> 2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a > difference. > >> It still hangs indefinitely. > >> 3. We are using Spark 1.5.2 > >> 4. We tried running this with 4 executors, 9 executors, and even in > local > >> mode with master set to "local[4]". The issue still persists in all > cases. > >> 5. Even without trying to cache any of the dataframes this issue still > >> happens,. > >> 6. We have about 200 partitions. > >> > >> Any help would be appreciated! > >> > >> Best Regards, > >> Mo > >> > >> On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta < > gourav.sengu...@gmail.com> > >> wrote: > >>> > >>> Sorry, > >>> > >>> please include the following questions to the list above: > >>> > >>> the SPARK version? > >>> whether you are using RDD or DataFrames? > >>> is the code run locally or in SPARK Cluster mode or in AWS EMR? > >>> > >>> > >>> Regards, > >>> Gourav Sengupta > >>> > >>> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta > >>> <gourav.sengu...@gmail.com> wrote: > >>>> > >>>> Hi Tamara, > >>>> > >>>> few basic questions first. > >>>> > >>>> How many executors are you using? > >>>> Is the data getting all cached into the same executor? > >>>> How many partitions do you have of the data? > >>>> How many fields are you trying to use in the join? > >>>> > >>>> If you need any help in finding answer to these questions please let > me > >>>> know. From what I reckon joins like yours should not take more than a > few > >>>> milliseconds. > >>>> > >>>> > >>>> Regards, > >>>> Gourav Sengupta > >>>> > >>>> On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt <t...@hellofresh.com> > wrote: > >>>>> > >>>>> Hi all, > >>>>> > >>>>> I am running a Spark job that gets stuck attempting to join two > >>>>> dataframes. The dataframes are not very large, one is about 2 M > rows, and > >>>>> the other a couple of thousand rows and the resulting joined > dataframe > >>>>> should be about the same size as the smaller dataframe. I have tried > >>>>> triggering execution of the join using the 'first' operator, which > as far as > >>>>> I understand would not require processing the entire resulting > dataframe > >>>>> (maybe I am mistaken though). The Spark UI is not telling me > anything, just > >>>>> showing the task to be stuck. > >>>>> > >>>>> When I run the exact same job on a slightly smaller dataset it works > >>>>> without hanging. > >>>>> > >>>>> I have used the same environment to run joins on much larger > dataframes, > >>>>> so I am confused as to why in this particular case my Spark job is > just > >>>>> hanging. I have also tried running the same join operation using > pyspark on > >>>>> two 2 Million row dataframes (exactly like the one I am trying to > join in > >>>>> the job that gets stuck) and it runs succesfully. > >>>>> > >>>>> I have tried caching the joined dataframe to see how much memory it > is > >>>>> requiring but the job gets stuck on this action too. I have also > tried using > >>>>> persist to memory and disk on the join, and the job seems to be > stuck all > >>>>> the same. > >>>>> > >>>>> Any help as to where to look for the source of the problem would be > much > >>>>> appreciated. > >>>>> > >>>>> Cheers, > >>>>> > >>>>> Tamara > >>>>> > >>>> > >>> > >> > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > > >