well an interesting side effect of this is that i can now control the number of partitions for every shuffle in a dataframe job, as opposed to having a single setting for number of partitions across all shuffles.
basically i can set spark.sql.shuffle.partitions to some huge number, and then for every groupByKey (or any other shuffle operation) follow it up with a coalesce to set the number of partitions. its like i have numPartitions back from those good old RDD shuffle methods :) On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers <ko...@tresata.com> wrote: > an new map task after a shuffle is also a narrow dependency, isnt it? its > narrow because data doesn't need to move, e.g. every partition depends on > single partition, preferably on same machine. > > modifying a previous shuffle to avoid a shuffle strikes me as odd, and can > potentially make a mess of performance, especially when no shuffle is > needed. just a new map task. > > > On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim <kabh...@gmail.com> wrote: > >> > shouldnt coalesce introduce a new map-phase with less tasks instead of >> changing the previous shuffle? >> >> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It >> results in narrow dependency, hence no shuffle. >> >> So it is pretty clear that you need to use "repartition". Not sure >> there's any available trick to achieve it without calling repartition. >> >> Thanks, >> Jungtaek Lim (HeartSaVioR) >> >> 1. https://github.com/apache/spark/blob/a40806d2bd84e9a03081 >> 65f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/ >> spark/sql/Dataset.scala#L2918-L2937 >> >> >> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers <ko...@tresata.com>님이 작성: >> >>> sorry i meant to say: >>> wit a checkpoint i get a map phase with lots of tasks to read the data, >>> then a reduce phase with 2048 reducers, and then finally a map phase with >>> 100 tasks. >>> >>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers <ko...@tresata.com> wrote: >>> >>>> the only thing that seems to stop this so far is a checkpoint. >>>> >>>> wit a checkpoint i get a map phase with lots of tasks to read the data, >>>> then a reduce phase with 2048 reducers, and then finally a map phase with 4 >>>> tasks. >>>> >>>> now i need to figure out how to do this without having to checkpoint. i >>>> wish i could insert something like a dummy operation that logical steps >>>> cannot jump over. >>>> >>>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <ko...@tresata.com> >>>> wrote: >>>> >>>>> ok thanks. >>>>> >>>>> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase >>>>> with less tasks instead of changing the previous shuffle? >>>>> >>>>> using repartition seems too expensive just to keep the number of files >>>>> down. so i guess i am back to looking for another solution. >>>>> >>>>> >>>>> >>>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <va...@datadoghq.com> >>>>> wrote: >>>>> >>>>>> `coalesce` sets the number of partitions for the last stage, so you >>>>>> have to use `repartition` instead which is going to introduce an extra >>>>>> shuffle stage >>>>>> >>>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <ko...@tresata.com> >>>>>> wrote: >>>>>> > >>>>>> > one small correction: lots of files leads to pressure on the spark >>>>>> driver program when reading this data in spark. >>>>>> > >>>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <ko...@tresata.com> >>>>>> wrote: >>>>>> >> >>>>>> >> hi, >>>>>> >> >>>>>> >> i am reading data from files into a dataframe, then doing a >>>>>> groupBy for a given column with a count, and finally i coalesce to a >>>>>> smaller number of partitions before writing out to disk. so roughly: >>>>>> >> >>>>>> >> spark.read.format(...).load(...).groupBy(column).count().coa >>>>>> lesce(100).write.format(...).save(...) >>>>>> >> >>>>>> >> i have this setting: spark.sql.shuffle.partitions=2048 >>>>>> >> >>>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing >>>>>> instead is a shuffle with only 100 partitions. it's like the coalesce has >>>>>> taken over the partitioning of the groupBy. >>>>>> >> >>>>>> >> any idea why? >>>>>> >> >>>>>> >> i am doing coalesce because it is not helpful to write out 2048 >>>>>> files, lots of files leads to pressure down the line on executors reading >>>>>> this data (i am writing to just one partition of a larger dataset), and >>>>>> since i have less than 100 executors i expect it to be efficient. so >>>>>> sounds >>>>>> like a good idea, no? >>>>>> >> >>>>>> >> but i do need 2048 partitions in my shuffle due to the operation i >>>>>> am doing in the groupBy (in my real problem i am not just doing a >>>>>> count...). >>>>>> >> >>>>>> >> thanks! >>>>>> >> koert >>>>>> >> >>>>>> > >>>>>> >>>>>> >>>>>> -- >>>>>> Sent from my iPhone >>>>>> >>>>> >>>>> >>>> >>> >