[ 
https://issues.apache.org/jira/browse/SPARK-10797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zoltán Zvara updated SPARK-10797:
---------------------------------
    Description: 
It seems that {{RDD.coalesce}} will unnecessarily write out (to shuffle files) 
temporary keys used on the shuffle code path. Consider the following code:

{code:title=RDD.scala|borderStyle=solid}
if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a 
random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. 
The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions).values
    } else {
{code}

{{ShuffledRDD}} will hash using {{position}} as keys as in the 
{{distributePartition}} function. After the bucket has been chosen by the 
sorter {{ExternalSorter}} or {{BypassMergeSortShuffleWriter}}, the 
{{DiskBlockObjectWriter}} write out both the (temporary) key and value to the 
spacified partition. On the next stage, after reading we take only the values 
with {{PairRDDFunctions}}.

This certainly has a performance impact, as we unnecessarily write/read 
{{Int}}s and transform the data.

  was:
It seems that {{RDD.coalesce}} will unnecessarily write out (to shuffle files) 
temporary keys used on the shuffle code path. Consider the following code:

{code:title=RDD.scala|borderStyle=solid}
if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a 
random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. 
The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions).values
    } else {
{code}

{{ShuffledRDD}} will hash using {{position}}s as keys as in the 
{{distributePartition}} function. After the bucket has been chosen by the 
sorter {{ExternalSorter}} or {{BypassMergeSortShuffleWriter}}, the 
{{DiskBlockObjectWriter}} write out both the (temporary) key and value to the 
spacified partition. On the next stage, after reading we take only the values 
with {{PairRDDFunctions}}.

This certainly has a performance impact, as we unnecessarily write/read 
{{Int}}s and transform the data.


> RDD's coalesce should not write out the temporary key
> -----------------------------------------------------
>
>                 Key: SPARK-10797
>                 URL: https://issues.apache.org/jira/browse/SPARK-10797
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>            Reporter: Zoltán Zvara
>
> It seems that {{RDD.coalesce}} will unnecessarily write out (to shuffle 
> files) temporary keys used on the shuffle code path. Consider the following 
> code:
> {code:title=RDD.scala|borderStyle=solid}
> if (shuffle) {
>       /** Distributes elements evenly across output partitions, starting from 
> a random partition. */
>       val distributePartition = (index: Int, items: Iterator[T]) => {
>         var position = (new Random(index)).nextInt(numPartitions)
>         items.map { t =>
>           // Note that the hash code of the key will just be the key itself. 
> The HashPartitioner
>           // will mod it with the number of total partitions.
>           position = position + 1
>           (position, t)
>         }
>       } : Iterator[(Int, T)]
>       // include a shuffle step so that our upstream tasks are still 
> distributed
>       new CoalescedRDD(
>         new ShuffledRDD[Int, T, 
> T](mapPartitionsWithIndex(distributePartition),
>         new HashPartitioner(numPartitions)),
>         numPartitions).values
>     } else {
> {code}
> {{ShuffledRDD}} will hash using {{position}} as keys as in the 
> {{distributePartition}} function. After the bucket has been chosen by the 
> sorter {{ExternalSorter}} or {{BypassMergeSortShuffleWriter}}, the 
> {{DiskBlockObjectWriter}} write out both the (temporary) key and value to the 
> spacified partition. On the next stage, after reading we take only the values 
> with {{PairRDDFunctions}}.
> This certainly has a performance impact, as we unnecessarily write/read 
> {{Int}}s and transform the data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to