How many rows are you joining? How many rows in the output?

Regards
Sab
On 24-Oct-2015 2:32 am, "pratik khadloya" <tispra...@gmail.com> wrote:

> Actually the groupBy is not taking a lot of time.
> The join that i do later takes the most (95 %) amount of time.
> Also, the grouping i am doing is based on the DataFrame api, which does
> not contain any function for reduceBy... i guess the DF automatically uses
> reduce by when we do a group by.
>
> ~Pratik
>
> On Fri, Oct 23, 2015 at 1:38 PM Kartik Mathur <kar...@bluedata.com> wrote:
>
>> Don't use groupBy , use reduceByKey instead , groupBy should always be
>> avoided as it leads to lot of shuffle reads/writes.
>>
>> On Fri, Oct 23, 2015 at 11:39 AM, pratik khadloya <tispra...@gmail.com>
>> wrote:
>>
>>> Sorry i sent the wrong join code snippet, the actual snippet is
>>>
>>> ggImpsDf.join(
>>>    aggRevenueDf,
>>>    aggImpsDf("id_1") <=> aggRevenueDf("id_1")
>>>      && aggImpsDf("id_2") <=> aggRevenueDf("id_2")
>>>      && aggImpsDf("day_hour") <=> aggRevenueDf("day_hour")
>>>      && aggImpsDf("day_hour_2") <=> aggRevenueDf("day_hour_2"),
>>>    "inner")
>>>    .select(
>>>      aggImpsDf("id_1"), aggImpsDf("id_2"), aggImpsDf("day_hour"),
>>>      aggImpsDf("day_hour_2"), aggImpsDf("metric1"),
>>> aggRevenueDf("metric2"))
>>>    .coalesce(200)
>>>
>>>
>>> On Fri, Oct 23, 2015 at 11:16 AM pratik khadloya <tispra...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Data about my spark job is below. My source data is only 916MB (stage
>>>> 0) and 231MB (stage 1), but when i join the two data sets (stage 2) it
>>>> takes a very long time and as i see the shuffled data is 614GB. Is this
>>>> something expected? Both the data sets produce 200 partitions.
>>>>
>>>> Stage IdDescriptionSubmittedDurationTasks: 
>>>> Succeeded/TotalInputOutputShuffle
>>>> ReadShuffle Write2saveAsTable at Driver.scala:269
>>>> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=2&attempt=0>
>>>> +details
>>>>
>>>> 2015/10/22 18:48:122.3 h
>>>> 200/200
>>>> 614.6 GB1saveAsTable at Driver.scala:269
>>>> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=1&attempt=0>
>>>> +details
>>>>
>>>> 2015/10/22 18:46:022.1 min
>>>> 8/8
>>>> 916.2 MB3.9 MB0saveAsTable at Driver.scala:269
>>>> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=0&attempt=0>
>>>> +details
>>>>
>>>> 2015/10/22 18:46:0235 s
>>>> 3/3
>>>> 231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins
>>>> the two data sets is:
>>>>
>>>> hc.sql(query).
>>>>     mapPartitions(iter => {
>>>>       iter.map {
>>>>         case Row(
>>>>          ...
>>>>          ...
>>>>          ...
>>>>         )
>>>>       }
>>>>     }
>>>>     ).toDF()
>>>>     .groupBy("id_1", "id_2", "day_hour", "day_hour_2")
>>>>     .agg($"id_1", $"id_2", $"day_hour", $"day_hour_2",
>>>>       sum("attr1").alias("attr1"), sum("attr2").alias("attr2"))
>>>>
>>>>
>>>> Please advise on how to reduce the shuffle and speed this up.
>>>>
>>>>
>>>> ~Pratik
>>>>
>>>>
>>

Reply via email to