[ https://issues.apache.org/jira/browse/SPARK-10797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zoltán Zvara updated SPARK-10797: --------------------------------- Description: It seems that {{RDD.coalesce}} will unnecessarily write out (to shuffle files) temporary keys used on the shuffle code path. Consider the following code: {code:title=RDD.scala|borderStyle=solid} if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions).values } else { {code} {{ShuffledRDD}} will hash using {{position}} as keys as in the {{distributePartition}} function. After the bucket has been chosen by the sorter {{ExternalSorter}} or {{BypassMergeSortShuffleWriter}}, the {{DiskBlockObjectWriter}} writes out both the (temporary) key and value to the specified partition. On the next stage, after reading we take only the values with {{PairRDDFunctions}}. This certainly has a performance impact, as we unnecessarily write/read {{Int}} and transform the data. was: It seems that {{RDD.coalesce}} will unnecessarily write out (to shuffle files) temporary keys used on the shuffle code path. Consider the following code: {code:title=RDD.scala|borderStyle=solid} if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions).values } else { {code} {{ShuffledRDD}} will hash using {{position}} as keys as in the {{distributePartition}} function. After the bucket has been chosen by the sorter {{ExternalSorter}} or {{BypassMergeSortShuffleWriter}}, the {{DiskBlockObjectWriter}} writes out both the (temporary) key and value to the specified partition. On the next stage, after reading we take only the values with {{PairRDDFunctions}}. This certainly has a performance impact, as we unnecessarily write/read {{Int}}s and transform the data. > RDD's coalesce should not write out the temporary key > ----------------------------------------------------- > > Key: SPARK-10797 > URL: https://issues.apache.org/jira/browse/SPARK-10797 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Reporter: Zoltán Zvara > > It seems that {{RDD.coalesce}} will unnecessarily write out (to shuffle > files) temporary keys used on the shuffle code path. Consider the following > code: > {code:title=RDD.scala|borderStyle=solid} > if (shuffle) { > /** Distributes elements evenly across output partitions, starting from > a random partition. */ > val distributePartition = (index: Int, items: Iterator[T]) => { > var position = (new Random(index)).nextInt(numPartitions) > items.map { t => > // Note that the hash code of the key will just be the key itself. > The HashPartitioner > // will mod it with the number of total partitions. > position = position + 1 > (position, t) > } > } : Iterator[(Int, T)] > // include a shuffle step so that our upstream tasks are still > distributed > new CoalescedRDD( > new ShuffledRDD[Int, T, > T](mapPartitionsWithIndex(distributePartition), > new HashPartitioner(numPartitions)), > numPartitions).values > } else { > {code} > {{ShuffledRDD}} will hash using {{position}} as keys as in the > {{distributePartition}} function. After the bucket has been chosen by the > sorter {{ExternalSorter}} or {{BypassMergeSortShuffleWriter}}, the > {{DiskBlockObjectWriter}} writes out both the (temporary) key and value to > the specified partition. On the next stage, after reading we take only the > values with {{PairRDDFunctions}}. > This certainly has a performance impact, as we unnecessarily write/read > {{Int}} and transform the data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org