[jira] [Updated] (SPARK-23778) SparkContext.emptyRDD confuses SparkContext.union

2018-05-15 Thread Marco Gaido (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Gaido updated SPARK-23778:

Priority: Trivial  (was: Minor)

> SparkContext.emptyRDD confuses SparkContext.union
> -
>
> Key: SPARK-23778
> URL: https://issues.apache.org/jira/browse/SPARK-23778
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.3.0, 2.3.0
>Reporter: Stefano Pettini
>Priority: Trivial
> Attachments: as_it_should_be.png, 
> partitioner_lost_and_unneeded_extra_stage.png
>
>
> SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether 
> it's partitioned or not should be just a academic debate. Unfortunately it 
> doesn't seem to be like this and the issue has side effects.
> Namely, it confuses the RDD union.
> When there are N classic RDDs partitioned the same way, the union is 
> implemented with the optimized PartitionerAwareUnionRDD, that retains the 
> common partitioner in the result. If one of the N RDDs happens to be an 
> emptyRDD, as it doesn't have a partitioner, the union is implemented by just 
> appending all the partitions of the N RDDs, dropping the partitioner. But 
> there's no need for this, as the emptyRDD contains no elements. This results 
> in further unneeded shuffles once the result of the union is used.
> See for example:
> {{val p = new HashPartitioner(3)}}
> {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / 
> 10).partitionBy(p)}}
> {{val b1 = a.mapValues(_ + 1)}}
> {{val b2 = a.mapValues(_ - 1)}}
> {{val e = context.emptyRDD[(Int, Int)]}}
> {{val x = context.union(a, b1, b2, e)}}
> {{val y = x.reduceByKey(_ + _)}}
> {{assert(x.partitioner.contains(p))}}
> {{y.collect()}}
> The assert fails. Disabling it, it's possible to see that reduceByKey 
> introduced a shuffles, although all the input RDDs are already partitioned 
> the same way, but the emptyRDD.
> Forcing a partitioner on the emptyRDD:
> {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}}
> solves the problem with the assert and doesn't introduce the unneeded extra 
> stage and shuffle.
> Union implementation should be changed to ignore the partitioner of emptyRDDs 
> and consider those as _partitioned in a way compatible with any partitioner_, 
> basically ignoring them.
> Present since 1.3 at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23778) SparkContext.emptyRDD confuses SparkContext.union

2018-03-23 Thread Stefano Pettini (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefano Pettini updated SPARK-23778:

Attachment: as_it_should_be.png

> SparkContext.emptyRDD confuses SparkContext.union
> -
>
> Key: SPARK-23778
> URL: https://issues.apache.org/jira/browse/SPARK-23778
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.3.0, 2.3.0
>Reporter: Stefano Pettini
>Priority: Minor
> Attachments: as_it_should_be.png, 
> partitioner_lost_and_unneeded_extra_stage.png
>
>
> SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether 
> it's partitioned or not should be just a academic debate. Unfortunately it 
> doesn't seem to be like this and the issue has side effects.
> Namely, it confuses the RDD union.
> When there are N classic RDDs partitioned the same way, the union is 
> implemented with the optimized PartitionerAwareUnionRDD, that retains the 
> common partitioner in the result. If one of the N RDDs happens to be an 
> emptyRDD, as it doesn't have a partitioner, the union is implemented by just 
> appending all the partitions of the N RDDs, dropping the partitioner. But 
> there's no need for this, as the emptyRDD contains no elements. This results 
> in further unneeded shuffles once the result of the union is used.
> See for example:
> {{val p = new HashPartitioner(3)}}
> {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / 
> 10).partitionBy(p)}}
> {{val b1 = a.mapValues(_ + 1)}}
> {{val b2 = a.mapValues(_ - 1)}}
> {{val e = context.emptyRDD[(Int, Int)]}}
> {{val x = context.union(a, b1, b2, e)}}
> {{val y = x.reduceByKey(_ + _)}}
> {{assert(x.partitioner.contains(p))}}
> {{y.collect()}}
> The assert fails. Disabling it, it's possible to see that reduceByKey 
> introduced a shuffles, although all the input RDDs are already partitioned 
> the same way, but the emptyRDD.
> Forcing a partitioner on the emptyRDD:
> {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}}
> solves the problem with the assert and doesn't introduce the unneeded extra 
> stage and shuffle.
> Union implementation should be changed to ignore the partitioner of emptyRDDs 
> and consider those as _partitioned in a way compatible with any partitioner_, 
> basically ignoring them.
> Present since 1.3 at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23778) SparkContext.emptyRDD confuses SparkContext.union

2018-03-23 Thread Stefano Pettini (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefano Pettini updated SPARK-23778:

Attachment: partitioner_lost_and_unneeded_extra_stage.png

> SparkContext.emptyRDD confuses SparkContext.union
> -
>
> Key: SPARK-23778
> URL: https://issues.apache.org/jira/browse/SPARK-23778
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.3.0, 2.3.0
>Reporter: Stefano Pettini
>Priority: Minor
> Attachments: partitioner_lost_and_unneeded_extra_stage.png
>
>
> SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether 
> it's partitioned or not should be just a academic debate. Unfortunately it 
> doesn't seem to be like this and the issue has side effects.
> Namely, it confuses the RDD union.
> When there are N classic RDDs partitioned the same way, the union is 
> implemented with the optimized PartitionerAwareUnionRDD, that retains the 
> common partitioner in the result. If one of the N RDDs happens to be an 
> emptyRDD, as it doesn't have a partitioner, the union is implemented by just 
> appending all the partitions of the N RDDs, dropping the partitioner. But 
> there's no need for this, as the emptyRDD contains no elements. This results 
> in further unneeded shuffles once the result of the union is used.
> See for example:
> {{val p = new HashPartitioner(3)}}
> {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / 
> 10).partitionBy(p)}}
> {{val b1 = a.mapValues(_ + 1)}}
> {{val b2 = a.mapValues(_ - 1)}}
> {{val e = context.emptyRDD[(Int, Int)]}}
> {{val x = context.union(a, b1, b2, e)}}
> {{val y = x.reduceByKey(_ + _)}}
> {{assert(x.partitioner.contains(p))}}
> {{y.collect()}}
> The assert fails. Disabling it, it's possible to see that reduceByKey 
> introduced a shuffles, although all the input RDDs are already partitioned 
> the same way, but the emptyRDD.
> Forcing a partitioner on the emptyRDD:
> {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}}
> solves the problem with the assert and doesn't introduce the unneeded extra 
> stage and shuffle.
> Union implementation should be changed to ignore the partitioner of emptyRDDs 
> and consider those as _partitioned in a way compatible with any partitioner_, 
> basically ignoring them.
> Present since 1.3 at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org