[ 
https://issues.apache.org/jira/browse/SPARK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated SPARK-33356:
-----------------------------------
    Description: 
The current implementation of the {{DAGScheduler}} exhibits exponential runtime 
in DAGs with many {{PartitionerAwareUnions}}. The reason seems to be a mutual 
recursion between {{PartitionerAwareUnion.getPreferredLocations}} and 
{{DAGScheduler.getPreferredLocs}}.

A minimal example reproducing the issue:
{code:scala}
object Example extends App {
  val partitioner = new HashPartitioner(2)
  val sc = new SparkContext(new 
SparkConf().setAppName("").setMaster("local[*]"))
  val rdd1 = sc.emptyRDD[(Int, Int)].partitionBy(partitioner)
  val rdd2 = (1 to 30).map(_ => rdd1)
  val rdd3 = rdd2.reduce(_ union _)
  rdd3.collect()
}
{code}
The whole app should take around one second to complete, as no actual work is 
done. However, it takes more time to submit the job than I am willing to wait.

The underlying cause appears to be mutual recursion between 
{{PartitionerAwareUnion.getPreferredLocations}} and 
{{DAGScheduler.getPreferredLocs}}, which restarts graph traversal at each 
{{PartitionerAwareUnion}} with no memoization. Each node of the DAG is visited 
{{O(n!)}} (exponentially many) times.

Note, that it is clear to me that you could use {{sc.union(rdd2)}} instead of 
{{rdd2.reduce(_ union _)}} to eliminate the problem. I use this just to 
demonstrate the issue in a sufficiently small example. Given a large DAG and 
many PartitionerAwareUnions, especially contructed by iterative algorithms, the 
problem can become relevant even without "abuse" of the union operation.

The exponential recursion in DAG Schedular was largely fixed with SPARK-682, 
but in the special case of PartitionerAwareUnion, it is still possible. This 
may actually be an underlying cause of SPARK-29181.

  was:
The current implementation of the {{DAGScheduler}} exhibits exponential runtime 
in DAGs with many {{PartitionerAwareUnions}}. The reason seems to be a mutual 
recursion between {{PartitionerAwareUnion.getPreferredLocations}} and 
{{DAGScheduler.getPreferredLocs}}.

A minimal example reproducing the issue:
{code:java}
object Example extends App {
  val partitioner = new HashPartitioner(2)
  val sc = new SparkContext(new 
SparkConf().setAppName("").setMaster("local[*]"))
  val rdd1 = sc.emptyRDD[(Int, Int)].partitionBy(partitioner)
  val rdd2 = (1 to 30).map(_ => rdd1)
  val rdd3 = rdd2.reduce(_ union _)
  rdd3.collect()
}
{code}
The whole app should take around one second to complete, as no actual work is 
done. However, it takes more time to submit the job than I am willing to wait.

The underlying cause appears to be mutual recursion between 
{{PartitionerAwareUnion.getPreferredLocations}} and 
{{DAGScheduler.getPreferredLocs}}, which restarts graph traversal at each 
{{PartitionerAwareUnion}} with no memoization. Each node of the DAG is visited 
{{O(n!)}} (exponentially many) times.

Note, that it is clear to me that you could use {{sc.union(rdd2)}} instead of 
{{rdd2.reduce(_ union _)}} to eliminate the problem. I use this just to 
demonstrate the issue in a sufficiently small example. Given a large DAG and 
many PartitionerAwareUnions, especially contructed by iterative algorithms, the 
problem can become relevant even without "abuse" of the union operation.

The exponential recursion in DAG Schedular was largely fixed with SPARK-682, 
but in the special case of PartitionerAwareUnion, it is still possible. This 
may actually be an underlying cause of SPARK-29181.


> DAG Scheduler exhibits exponential runtime with PartitionerAwareUnion
> ---------------------------------------------------------------------
>
>                 Key: SPARK-33356
>                 URL: https://issues.apache.org/jira/browse/SPARK-33356
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.2, 3.0.1
>         Environment: Reproducible locally with 3.0.1, 2.4.2, and latest 
> master.
>            Reporter: Lucas Brutschy
>            Priority: Minor
>
> The current implementation of the {{DAGScheduler}} exhibits exponential 
> runtime in DAGs with many {{PartitionerAwareUnions}}. The reason seems to be 
> a mutual recursion between {{PartitionerAwareUnion.getPreferredLocations}} 
> and {{DAGScheduler.getPreferredLocs}}.
> A minimal example reproducing the issue:
> {code:scala}
> object Example extends App {
>   val partitioner = new HashPartitioner(2)
>   val sc = new SparkContext(new 
> SparkConf().setAppName("").setMaster("local[*]"))
>   val rdd1 = sc.emptyRDD[(Int, Int)].partitionBy(partitioner)
>   val rdd2 = (1 to 30).map(_ => rdd1)
>   val rdd3 = rdd2.reduce(_ union _)
>   rdd3.collect()
> }
> {code}
> The whole app should take around one second to complete, as no actual work is 
> done. However, it takes more time to submit the job than I am willing to wait.
> The underlying cause appears to be mutual recursion between 
> {{PartitionerAwareUnion.getPreferredLocations}} and 
> {{DAGScheduler.getPreferredLocs}}, which restarts graph traversal at each 
> {{PartitionerAwareUnion}} with no memoization. Each node of the DAG is 
> visited {{O(n!)}} (exponentially many) times.
> Note, that it is clear to me that you could use {{sc.union(rdd2)}} instead of 
> {{rdd2.reduce(_ union _)}} to eliminate the problem. I use this just to 
> demonstrate the issue in a sufficiently small example. Given a large DAG and 
> many PartitionerAwareUnions, especially contructed by iterative algorithms, 
> the problem can become relevant even without "abuse" of the union operation.
> The exponential recursion in DAG Schedular was largely fixed with SPARK-682, 
> but in the special case of PartitionerAwareUnion, it is still possible. This 
> may actually be an underlying cause of SPARK-29181.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to