Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Koert Kuipers
thanks for that long reply jungtaek! so when i set spark.sql.shuffle.partitions to 2048 i have 2048 data partitions (or "partitions of state"). these are determined by a hashing function. ok got it! when i look at the application manager i also see 2048 "tasks" for the relevant stage. so tasks

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Jungtaek Lim
I could be wrong so welcome anyone to correct me if I'm missing here. You can expect Spark operators in narrow dependency as applying wrapped functions to an iterator (like "op3(op2(op1(iter)))"), and with such expectation there's no way to achieve adjusting partitions. Each partition is

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Koert Kuipers
well an interesting side effect of this is that i can now control the number of partitions for every shuffle in a dataframe job, as opposed to having a single setting for number of partitions across all shuffles. basically i can set spark.sql.shuffle.partitions to some huge number, and then for

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
an new map task after a shuffle is also a narrow dependency, isnt it? its narrow because data doesn't need to move, e.g. every partition depends on single partition, preferably on same machine. modifying a previous shuffle to avoid a shuffle strikes me as odd, and can potentially make a mess of

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Jungtaek Lim
> shouldnt coalesce introduce a new map-phase with less tasks instead of changing the previous shuffle? The javadoc of Dataset.coalesce [1] describes such behavior clearly. It results in narrow dependency, hence no shuffle. So it is pretty clear that you need to use "repartition". Not sure

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
sorry i meant to say: wit a checkpoint i get a map phase with lots of tasks to read the data, then a reduce phase with 2048 reducers, and then finally a map phase with 100 tasks. On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers wrote: > the only thing that seems to stop this so far is a

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
the only thing that seems to stop this so far is a checkpoint. wit a checkpoint i get a map phase with lots of tasks to read the data, then a reduce phase with 2048 reducers, and then finally a map phase with 4 tasks. now i need to figure out how to do this without having to checkpoint. i wish i

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
ok thanks. mh. that seems odd. shouldnt coalesce introduce a new map-phase with less tasks instead of changing the previous shuffle? using repartition seems too expensive just to keep the number of files down. so i guess i am back to looking for another solution. On Wed, Aug 8, 2018 at

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Vadim Semenov
`coalesce` sets the number of partitions for the last stage, so you have to use `repartition` instead which is going to introduce an extra shuffle stage On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers wrote: > > one small correction: lots of files leads to pressure on the spark driver > program

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
one small correction: lots of files leads to pressure on the spark driver program when reading this data in spark. On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers wrote: > hi, > > i am reading data from files into a dataframe, then doing a groupBy for a > given column with a count, and finally i

groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
hi, i am reading data from files into a dataframe, then doing a groupBy for a given column with a count, and finally i coalesce to a smaller number of partitions before writing out to disk. so roughly: