[ https://issues.apache.org/jira/browse/SPARK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucas Brutschy updated SPARK-33356: ----------------------------------- Description: The current implementation of the {{DAGScheduler}} exhibits exponential runtime in DAGs with many {{PartitionerAwareUnions}}. The reason seems to be a mutual recursion between {{PartitionerAwareUnion.getPreferredLocations}} and {{DAGScheduler.getPreferredLocs}}. A minimal example reproducing the issue: {code:scala} object Example extends App { val partitioner = new HashPartitioner(2) val sc = new SparkContext(new SparkConf().setAppName("").setMaster("local[*]")) val rdd1 = sc.emptyRDD[(Int, Int)].partitionBy(partitioner) val rdd2 = (1 to 30).map(_ => rdd1) val rdd3 = rdd2.reduce(_ union _) rdd3.collect() } {code} The whole app should take around one second to complete, as no actual work is done. However, it takes more time to submit the job than I am willing to wait. The underlying cause appears to be mutual recursion between {{PartitionerAwareUnion.getPreferredLocations}} and {{DAGScheduler.getPreferredLocs}}, which restarts graph traversal at each {{PartitionerAwareUnion}} with no memoization. Each node of the DAG is visited {{O(n!)}} (exponentially many) times. Note, that it is clear to me that you could use {{sc.union(rdd2)}} instead of {{rdd2.reduce(_ union _)}} to eliminate the problem. I use this just to demonstrate the issue in a sufficiently small example. Given a large DAG and many PartitionerAwareUnions, especially contructed by iterative algorithms, the problem can become relevant even without "abuse" of the union operation. The exponential recursion in DAG Schedular was largely fixed with SPARK-682, but in the special case of PartitionerAwareUnion, it is still possible. This may actually be an underlying cause of SPARK-29181. was: The current implementation of the {{DAGScheduler}} exhibits exponential runtime in DAGs with many {{PartitionerAwareUnions}}. The reason seems to be a mutual recursion between {{PartitionerAwareUnion.getPreferredLocations}} and {{DAGScheduler.getPreferredLocs}}. A minimal example reproducing the issue: {code:java} object Example extends App { val partitioner = new HashPartitioner(2) val sc = new SparkContext(new SparkConf().setAppName("").setMaster("local[*]")) val rdd1 = sc.emptyRDD[(Int, Int)].partitionBy(partitioner) val rdd2 = (1 to 30).map(_ => rdd1) val rdd3 = rdd2.reduce(_ union _) rdd3.collect() } {code} The whole app should take around one second to complete, as no actual work is done. However, it takes more time to submit the job than I am willing to wait. The underlying cause appears to be mutual recursion between {{PartitionerAwareUnion.getPreferredLocations}} and {{DAGScheduler.getPreferredLocs}}, which restarts graph traversal at each {{PartitionerAwareUnion}} with no memoization. Each node of the DAG is visited {{O(n!)}} (exponentially many) times. Note, that it is clear to me that you could use {{sc.union(rdd2)}} instead of {{rdd2.reduce(_ union _)}} to eliminate the problem. I use this just to demonstrate the issue in a sufficiently small example. Given a large DAG and many PartitionerAwareUnions, especially contructed by iterative algorithms, the problem can become relevant even without "abuse" of the union operation. The exponential recursion in DAG Schedular was largely fixed with SPARK-682, but in the special case of PartitionerAwareUnion, it is still possible. This may actually be an underlying cause of SPARK-29181. > DAG Scheduler exhibits exponential runtime with PartitionerAwareUnion > --------------------------------------------------------------------- > > Key: SPARK-33356 > URL: https://issues.apache.org/jira/browse/SPARK-33356 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.2, 3.0.1 > Environment: Reproducible locally with 3.0.1, 2.4.2, and latest > master. > Reporter: Lucas Brutschy > Priority: Minor > > The current implementation of the {{DAGScheduler}} exhibits exponential > runtime in DAGs with many {{PartitionerAwareUnions}}. The reason seems to be > a mutual recursion between {{PartitionerAwareUnion.getPreferredLocations}} > and {{DAGScheduler.getPreferredLocs}}. > A minimal example reproducing the issue: > {code:scala} > object Example extends App { > val partitioner = new HashPartitioner(2) > val sc = new SparkContext(new > SparkConf().setAppName("").setMaster("local[*]")) > val rdd1 = sc.emptyRDD[(Int, Int)].partitionBy(partitioner) > val rdd2 = (1 to 30).map(_ => rdd1) > val rdd3 = rdd2.reduce(_ union _) > rdd3.collect() > } > {code} > The whole app should take around one second to complete, as no actual work is > done. However, it takes more time to submit the job than I am willing to wait. > The underlying cause appears to be mutual recursion between > {{PartitionerAwareUnion.getPreferredLocations}} and > {{DAGScheduler.getPreferredLocs}}, which restarts graph traversal at each > {{PartitionerAwareUnion}} with no memoization. Each node of the DAG is > visited {{O(n!)}} (exponentially many) times. > Note, that it is clear to me that you could use {{sc.union(rdd2)}} instead of > {{rdd2.reduce(_ union _)}} to eliminate the problem. I use this just to > demonstrate the issue in a sufficiently small example. Given a large DAG and > many PartitionerAwareUnions, especially contructed by iterative algorithms, > the problem can become relevant even without "abuse" of the union operation. > The exponential recursion in DAG Schedular was largely fixed with SPARK-682, > but in the special case of PartitionerAwareUnion, it is still possible. This > may actually be an underlying cause of SPARK-29181. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org