Actually the groupBy is not taking a lot of time. The join that i do later takes the most (95 %) amount of time. Also, the grouping i am doing is based on the DataFrame api, which does not contain any function for reduceBy... i guess the DF automatically uses reduce by when we do a group by.
~Pratik On Fri, Oct 23, 2015 at 1:38 PM Kartik Mathur <kar...@bluedata.com> wrote: > Don't use groupBy , use reduceByKey instead , groupBy should always be > avoided as it leads to lot of shuffle reads/writes. > > On Fri, Oct 23, 2015 at 11:39 AM, pratik khadloya <tispra...@gmail.com> > wrote: > >> Sorry i sent the wrong join code snippet, the actual snippet is >> >> ggImpsDf.join( >> aggRevenueDf, >> aggImpsDf("id_1") <=> aggRevenueDf("id_1") >> && aggImpsDf("id_2") <=> aggRevenueDf("id_2") >> && aggImpsDf("day_hour") <=> aggRevenueDf("day_hour") >> && aggImpsDf("day_hour_2") <=> aggRevenueDf("day_hour_2"), >> "inner") >> .select( >> aggImpsDf("id_1"), aggImpsDf("id_2"), aggImpsDf("day_hour"), >> aggImpsDf("day_hour_2"), aggImpsDf("metric1"), >> aggRevenueDf("metric2")) >> .coalesce(200) >> >> >> On Fri, Oct 23, 2015 at 11:16 AM pratik khadloya <tispra...@gmail.com> >> wrote: >> >>> Hello, >>> >>> Data about my spark job is below. My source data is only 916MB (stage 0) >>> and 231MB (stage 1), but when i join the two data sets (stage 2) it takes a >>> very long time and as i see the shuffled data is 614GB. Is this something >>> expected? Both the data sets produce 200 partitions. >>> >>> Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle >>> ReadShuffle Write2saveAsTable at Driver.scala:269 >>> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=2&attempt=0> >>> +details >>> >>> 2015/10/22 18:48:122.3 h >>> 200/200 >>> 614.6 GB1saveAsTable at Driver.scala:269 >>> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=1&attempt=0> >>> +details >>> >>> 2015/10/22 18:46:022.1 min >>> 8/8 >>> 916.2 MB3.9 MB0saveAsTable at Driver.scala:269 >>> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=0&attempt=0> >>> +details >>> >>> 2015/10/22 18:46:0235 s >>> 3/3 >>> 231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins >>> the two data sets is: >>> >>> hc.sql(query). >>> mapPartitions(iter => { >>> iter.map { >>> case Row( >>> ... >>> ... >>> ... >>> ) >>> } >>> } >>> ).toDF() >>> .groupBy("id_1", "id_2", "day_hour", "day_hour_2") >>> .agg($"id_1", $"id_2", $"day_hour", $"day_hour_2", >>> sum("attr1").alias("attr1"), sum("attr2").alias("attr2")) >>> >>> >>> Please advise on how to reduce the shuffle and speed this up. >>> >>> >>> ~Pratik >>> >>> >