Note that, RDD partitions and Spark tasks are not always 1-1 mapping. Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.
Looking at your example, I don't see where is the problem. Can you describe what is not expected? On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <szh.s...@gmail.com> wrote: > Well, it seems that I can still extend the CoalesceRDD to make it preserve > the total number of partitions from the parent RDD, reduce some partitons > in the same way as the original coalesce does for map-only jobs and fill > the gaps (partitions which should reside on the positions of the coalesced > ones) with just a special kind of partitions which do not have any parent > dependencies and always return an empty iterator. > > I believe this should work as desired (at least the previous > ShuffleMapStage will think that the number of partitons in the next stage, > it generates shuffle output for, is not changed). > > There are few issues though - existence of empty partitions which can be > evaluated almost for free and empty output files from these empty partitons > which can be beaten by means of LazyOutputFormat in case of RDDs. > > > > On Mon, Oct 8, 2018, 23:57 Koert Kuipers <ko...@tresata.com> wrote: > >> although i personally would describe this as a bug the answer will be >> that this is the intended behavior. the coalesce "infects" the shuffle >> before it, making a coalesce useless for reducing output files after a >> shuffle with many partitions b design. >> >> your only option left is a repartition for which you pay the price in >> that it introduces another expensive shuffle. >> >> interestingly if you do a coalesce on a map-only job it knows how to >> reduce the partitions and output files without introducing a shuffle, so >> clearly it is possible, but i dont know how to get this behavior after a >> shuffle in an existing job. >> >> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <szh.s...@gmail.com> >> wrote: >> >>> Hello guys, >>> >>> Currently I'm a little bit confused with coalesce behaviour. >>> >>> Consider the following usecase - I'd like to join two pretty big RDDs. >>> To make a join more stable and to prevent it from failures by OOM RDDs >>> are usually repartitioned to redistribute data more evenly and to >>> prevent every partition from hitting 2GB limit. Then after join with a >>> lot of partitions. >>> >>> Then after successful join I'd like to save the resulting dataset. >>> But I don't need such a huge amount of files as the number of >>> partitions/tasks during joining. Actually I'm fine with such number of >>> files as the total number of executor cores allocated to the job. So >>> I've considered using a coalesce. >>> >>> The problem is that coalesce with shuffling disabled prevents join >>> from using the specified number of partitions and instead forces join >>> to use the number of partitions provided to coalesce >>> >>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, >>> false).toDebugString >>> res5: String = >>> (5) CoalescedRDD[15] at coalesce at <console>:25 [] >>> | MapPartitionsRDD[14] at repartition at <console>:25 [] >>> | CoalescedRDD[13] at repartition at <console>:25 [] >>> | ShuffledRDD[12] at repartition at <console>:25 [] >>> +-(20) MapPartitionsRDD[11] at repartition at <console>:25 [] >>> | ParallelCollectionRDD[10] at makeRDD at <console>:25 [] >>> >>> With shuffling enabled everything is ok, e.g. >>> >>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, >>> true).toDebugString >>> res6: String = >>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 [] >>> | CoalescedRDD[23] at coalesce at <console>:25 [] >>> | ShuffledRDD[22] at coalesce at <console>:25 [] >>> +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 [] >>> | MapPartitionsRDD[20] at repartition at <console>:25 [] >>> | CoalescedRDD[19] at repartition at <console>:25 [] >>> | ShuffledRDD[18] at repartition at <console>:25 [] >>> +-(20) MapPartitionsRDD[17] at repartition at <console>:25 [] >>> | ParallelCollectionRDD[16] at makeRDD at <console>:25 [] >>> >>> In that case the problem is that for pretty huge datasets additional >>> reshuffling can take hours or at least comparable amount of time as >>> for the join itself. >>> >>> So I'd like to understand whether it is a bug or just an expected >>> behaviour? >>> In case it is expected is there any way to insert additional >>> ShuffleMapStage into an appropriate position of DAG but without >>> reshuffling itself? >>> >>> --------------------------------------------------------------------- >>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>> >>>