Note that, RDD partitions and Spark tasks are not always 1-1 mapping.

Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then
`rdd2` has 10 partitions, and there is no shuffle between `rdd1` and
`rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and
this stage has 10 tasks (decided by the last RDD). This means, each Spark
task will process 10 partitions of `rdd1`.

Looking at your example, I don't see where is the problem. Can you describe
what is not expected?

On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <szh.s...@gmail.com>
wrote:

> Well, it seems that I can still extend the CoalesceRDD to make it preserve
> the total number of partitions from the parent RDD, reduce some partitons
> in the same way as the original coalesce does for map-only jobs and fill
> the gaps (partitions which should reside on the positions of the coalesced
> ones) with just a special kind of partitions which do not have any parent
> dependencies and always return an empty iterator.
>
> I believe this should work as desired (at least the previous
> ShuffleMapStage will think that the number of partitons in the next stage,
> it generates shuffle output for, is not changed).
>
> There are few issues though - existence of empty partitions which can be
> evaluated almost for free and empty output files from these empty partitons
> which can be beaten by means of LazyOutputFormat in case of RDDs.
>
>
>
> On Mon, Oct 8, 2018, 23:57 Koert Kuipers <ko...@tresata.com> wrote:
>
>> although i personally would describe this as a bug the answer will be
>> that this is the intended behavior. the coalesce "infects" the shuffle
>> before it, making a coalesce useless for reducing output files after a
>> shuffle with many partitions b design.
>>
>> your only option left is a repartition for which you pay the price in
>> that it introduces another expensive shuffle.
>>
>> interestingly if you do a coalesce on a map-only job it knows how to
>> reduce the partitions and output files without introducing a shuffle, so
>> clearly it is possible, but i dont know how to get this behavior after a
>> shuffle in an existing job.
>>
>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <szh.s...@gmail.com>
>> wrote:
>>
>>> Hello guys,
>>>
>>> Currently I'm a little bit confused with coalesce behaviour.
>>>
>>> Consider the following usecase - I'd like to join two pretty big RDDs.
>>> To make a join more stable and to prevent it from failures by OOM RDDs
>>> are usually repartitioned to redistribute data more evenly and to
>>> prevent every partition from hitting 2GB limit. Then after join with a
>>> lot of partitions.
>>>
>>> Then after successful join I'd like to save the resulting dataset.
>>> But I don't need such a huge amount of files as the number of
>>> partitions/tasks during joining. Actually I'm fine with such number of
>>> files as the total number of executor cores allocated to the job. So
>>> I've considered using a coalesce.
>>>
>>> The problem is that coalesce with shuffling disabled prevents join
>>> from using the specified number of partitions and instead forces join
>>> to use the number of partitions provided to coalesce
>>>
>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>>> false).toDebugString
>>> res5: String =
>>> (5) CoalescedRDD[15] at coalesce at <console>:25 []
>>>  |  MapPartitionsRDD[14] at repartition at <console>:25 []
>>>  |  CoalescedRDD[13] at repartition at <console>:25 []
>>>  |  ShuffledRDD[12] at repartition at <console>:25 []
>>>  +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
>>>     |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []
>>>
>>> With shuffling enabled everything is ok, e.g.
>>>
>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>>> true).toDebugString
>>> res6: String =
>>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 []
>>>  |  CoalescedRDD[23] at coalesce at <console>:25 []
>>>  |  ShuffledRDD[22] at coalesce at <console>:25 []
>>>  +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
>>>      |   MapPartitionsRDD[20] at repartition at <console>:25 []
>>>      |   CoalescedRDD[19] at repartition at <console>:25 []
>>>      |   ShuffledRDD[18] at repartition at <console>:25 []
>>>      +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
>>>         |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []
>>>
>>> In that case the problem is that for pretty huge datasets additional
>>> reshuffling can take hours or at least comparable amount of time as
>>> for the join itself.
>>>
>>> So I'd like to understand whether it is a bug or just an expected
>>> behaviour?
>>> In case it is expected is there any way to insert additional
>>> ShuffleMapStage into an appropriate position of DAG but without
>>> reshuffling itself?
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>

Reply via email to