Hello Everyone,

Thanks a lot for the help. We also managed to solve it but without
resorting to spark 1.6.

The problem we were having was because of a really bad join condition:

ON ((a.col1 = b.col1) or (a.col1 is null and b.col1 is null)) AND ((a.col2
= b.col2) or (a.col2 is null and b.col2 is null))

So what we did was re-work our logic to remove the null checks in the join
condition and the join went lightning fast afterwards :)
On Feb 22, 2016 21:24, "Dave Moyers" <davemoy...@icloud.com> wrote:

> Good article! Thanks for sharing!
>
>
> > On Feb 22, 2016, at 11:10 AM, Davies Liu <dav...@databricks.com> wrote:
> >
> > This link may help:
> >
> https://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html
> >
> > Spark 1.6 had improved the CatesianProduct, you should turn of auto
> > broadcast and go with CatesianProduct in 1.6
> >
> > On Mon, Feb 22, 2016 at 1:45 AM, Mohannad Ali <man...@gmail.com> wrote:
> >> Hello everyone,
> >>
> >> I'm working with Tamara and I wanted to give you guys an update on the
> >> issue:
> >>
> >> 1. Here is the output of .explain():
> >>>
> >>> Project
> >>>
> [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L
> >>> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS
> >>> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS
> >>> new_gender#42,fk_created_at_date#32 AS
> >>> new_fk_created_at_date#43,age_range#30 AS
> new_age_range#44,first_name#27 AS
> >>> new_first_name#45,last_name#28 AS new_last_name#46]
> >>> BroadcastNestedLoopJoin BuildLeft, LeftOuter, Some((((customer_id#1L =
> >>> customer_id#25L) || (isnull(customer_id#1L) &&
> isnull(customer_id#25L))) &&
> >>> ((country#2 = country#24) || (isnull(country#2) &&
> isnull(country#24)))))
> >>>  Scan
> >>>
> PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32]
> >>>  Scan
> >>>
> ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L]
> >>
> >>
> >> 2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a
> difference.
> >> It still hangs indefinitely.
> >> 3. We are using Spark 1.5.2
> >> 4. We tried running this with 4 executors, 9 executors, and even in
> local
> >> mode with master set to "local[4]". The issue still persists in all
> cases.
> >> 5. Even without trying to cache any of the dataframes this issue still
> >> happens,.
> >> 6. We have about 200 partitions.
> >>
> >> Any help would be appreciated!
> >>
> >> Best Regards,
> >> Mo
> >>
> >> On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com>
> >> wrote:
> >>>
> >>> Sorry,
> >>>
> >>> please include the following questions to the list above:
> >>>
> >>> the SPARK version?
> >>> whether you are using RDD or DataFrames?
> >>> is the code run locally or in SPARK Cluster mode or in AWS EMR?
> >>>
> >>>
> >>> Regards,
> >>> Gourav Sengupta
> >>>
> >>> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta
> >>> <gourav.sengu...@gmail.com> wrote:
> >>>>
> >>>> Hi Tamara,
> >>>>
> >>>> few basic questions first.
> >>>>
> >>>> How many executors are you using?
> >>>> Is the data getting all cached into the same executor?
> >>>> How many partitions do you have of the data?
> >>>> How many fields are you trying to use in the join?
> >>>>
> >>>> If you need any help in finding answer to these questions please let
> me
> >>>> know. From what I reckon joins like yours should not take more than a
> few
> >>>> milliseconds.
> >>>>
> >>>>
> >>>> Regards,
> >>>> Gourav Sengupta
> >>>>
> >>>> On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt <t...@hellofresh.com>
> wrote:
> >>>>>
> >>>>> Hi all,
> >>>>>
> >>>>> I am running a Spark job that gets stuck attempting to join two
> >>>>> dataframes. The dataframes are not very large, one is about 2 M
> rows, and
> >>>>> the other a couple of thousand rows and the resulting joined
> dataframe
> >>>>> should be about the same size as the smaller dataframe. I have tried
> >>>>> triggering execution of the join using the 'first' operator, which
> as far as
> >>>>> I understand would not require processing the entire resulting
> dataframe
> >>>>> (maybe I am mistaken though). The Spark UI is not telling me
> anything, just
> >>>>> showing the task to be stuck.
> >>>>>
> >>>>> When I run the exact same job on a slightly smaller dataset it works
> >>>>> without hanging.
> >>>>>
> >>>>> I have used the same environment to run joins on much larger
> dataframes,
> >>>>> so I am confused as to why in this particular case my Spark job is
> just
> >>>>> hanging. I have also tried running the same join operation using
> pyspark on
> >>>>> two 2 Million row dataframes (exactly like the one I am trying to
> join in
> >>>>> the job that gets stuck) and it runs succesfully.
> >>>>>
> >>>>> I have tried caching the joined dataframe to see how much memory it
> is
> >>>>> requiring but the job gets stuck on this action too. I have also
> tried using
> >>>>> persist to memory and disk on the join, and the job seems to be
> stuck all
> >>>>> the same.
> >>>>>
> >>>>> Any help as to where to look for the source of the problem would be
> much
> >>>>> appreciated.
> >>>>>
> >>>>> Cheers,
> >>>>>
> >>>>> Tamara
> >>>>>
> >>>>
> >>>
> >>
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>

Reply via email to