[jira] [Updated] (SPARK-23778) SparkContext.emptyRDD confuses SparkContext.union
[ https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marco Gaido updated SPARK-23778: Priority: Trivial (was: Minor) > SparkContext.emptyRDD confuses SparkContext.union > - > > Key: SPARK-23778 > URL: https://issues.apache.org/jira/browse/SPARK-23778 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.0, 2.3.0 >Reporter: Stefano Pettini >Priority: Trivial > Attachments: as_it_should_be.png, > partitioner_lost_and_unneeded_extra_stage.png > > > SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether > it's partitioned or not should be just a academic debate. Unfortunately it > doesn't seem to be like this and the issue has side effects. > Namely, it confuses the RDD union. > When there are N classic RDDs partitioned the same way, the union is > implemented with the optimized PartitionerAwareUnionRDD, that retains the > common partitioner in the result. If one of the N RDDs happens to be an > emptyRDD, as it doesn't have a partitioner, the union is implemented by just > appending all the partitions of the N RDDs, dropping the partitioner. But > there's no need for this, as the emptyRDD contains no elements. This results > in further unneeded shuffles once the result of the union is used. > See for example: > {{val p = new HashPartitioner(3)}} > {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / > 10).partitionBy(p)}} > {{val b1 = a.mapValues(_ + 1)}} > {{val b2 = a.mapValues(_ - 1)}} > {{val e = context.emptyRDD[(Int, Int)]}} > {{val x = context.union(a, b1, b2, e)}} > {{val y = x.reduceByKey(_ + _)}} > {{assert(x.partitioner.contains(p))}} > {{y.collect()}} > The assert fails. Disabling it, it's possible to see that reduceByKey > introduced a shuffles, although all the input RDDs are already partitioned > the same way, but the emptyRDD. > Forcing a partitioner on the emptyRDD: > {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}} > solves the problem with the assert and doesn't introduce the unneeded extra > stage and shuffle. > Union implementation should be changed to ignore the partitioner of emptyRDDs > and consider those as _partitioned in a way compatible with any partitioner_, > basically ignoring them. > Present since 1.3 at least. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23778) SparkContext.emptyRDD confuses SparkContext.union
[ https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefano Pettini updated SPARK-23778: Attachment: as_it_should_be.png > SparkContext.emptyRDD confuses SparkContext.union > - > > Key: SPARK-23778 > URL: https://issues.apache.org/jira/browse/SPARK-23778 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.0, 2.3.0 >Reporter: Stefano Pettini >Priority: Minor > Attachments: as_it_should_be.png, > partitioner_lost_and_unneeded_extra_stage.png > > > SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether > it's partitioned or not should be just a academic debate. Unfortunately it > doesn't seem to be like this and the issue has side effects. > Namely, it confuses the RDD union. > When there are N classic RDDs partitioned the same way, the union is > implemented with the optimized PartitionerAwareUnionRDD, that retains the > common partitioner in the result. If one of the N RDDs happens to be an > emptyRDD, as it doesn't have a partitioner, the union is implemented by just > appending all the partitions of the N RDDs, dropping the partitioner. But > there's no need for this, as the emptyRDD contains no elements. This results > in further unneeded shuffles once the result of the union is used. > See for example: > {{val p = new HashPartitioner(3)}} > {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / > 10).partitionBy(p)}} > {{val b1 = a.mapValues(_ + 1)}} > {{val b2 = a.mapValues(_ - 1)}} > {{val e = context.emptyRDD[(Int, Int)]}} > {{val x = context.union(a, b1, b2, e)}} > {{val y = x.reduceByKey(_ + _)}} > {{assert(x.partitioner.contains(p))}} > {{y.collect()}} > The assert fails. Disabling it, it's possible to see that reduceByKey > introduced a shuffles, although all the input RDDs are already partitioned > the same way, but the emptyRDD. > Forcing a partitioner on the emptyRDD: > {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}} > solves the problem with the assert and doesn't introduce the unneeded extra > stage and shuffle. > Union implementation should be changed to ignore the partitioner of emptyRDDs > and consider those as _partitioned in a way compatible with any partitioner_, > basically ignoring them. > Present since 1.3 at least. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23778) SparkContext.emptyRDD confuses SparkContext.union
[ https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefano Pettini updated SPARK-23778: Attachment: partitioner_lost_and_unneeded_extra_stage.png > SparkContext.emptyRDD confuses SparkContext.union > - > > Key: SPARK-23778 > URL: https://issues.apache.org/jira/browse/SPARK-23778 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.0, 2.3.0 >Reporter: Stefano Pettini >Priority: Minor > Attachments: partitioner_lost_and_unneeded_extra_stage.png > > > SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether > it's partitioned or not should be just a academic debate. Unfortunately it > doesn't seem to be like this and the issue has side effects. > Namely, it confuses the RDD union. > When there are N classic RDDs partitioned the same way, the union is > implemented with the optimized PartitionerAwareUnionRDD, that retains the > common partitioner in the result. If one of the N RDDs happens to be an > emptyRDD, as it doesn't have a partitioner, the union is implemented by just > appending all the partitions of the N RDDs, dropping the partitioner. But > there's no need for this, as the emptyRDD contains no elements. This results > in further unneeded shuffles once the result of the union is used. > See for example: > {{val p = new HashPartitioner(3)}} > {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / > 10).partitionBy(p)}} > {{val b1 = a.mapValues(_ + 1)}} > {{val b2 = a.mapValues(_ - 1)}} > {{val e = context.emptyRDD[(Int, Int)]}} > {{val x = context.union(a, b1, b2, e)}} > {{val y = x.reduceByKey(_ + _)}} > {{assert(x.partitioner.contains(p))}} > {{y.collect()}} > The assert fails. Disabling it, it's possible to see that reduceByKey > introduced a shuffles, although all the input RDDs are already partitioned > the same way, but the emptyRDD. > Forcing a partitioner on the emptyRDD: > {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}} > solves the problem with the assert and doesn't introduce the unneeded extra > stage and shuffle. > Union implementation should be changed to ignore the partitioner of emptyRDDs > and consider those as _partitioned in a way compatible with any partitioner_, > basically ignoring them. > Present since 1.3 at least. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org