[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator
[ https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yin Huai updated SPARK-6012: Target Version/s: (was: 1.4.0) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator -- Key: SPARK-6012 URL: https://issues.apache.org/jira/browse/SPARK-6012 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1 Reporter: Max Seiden Priority: Critical h3. Summary I've found that a deadlock occurs when asking for the partitions from a SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs when a child RDDs asks the DAGScheduler for preferred partition locations (which locks the scheduler) and eventually hits the #execute() of the TakeOrdered operator, which submits tasks but is blocked when it also tries to get preferred locations (in a separate thread). It seems like the TakeOrdered op's #execute() method should not actually submit a task (it is calling #executeCollect() and creating a new RDD) and should instead stay more true to the comment a logically apply a Limit on top of a Sort. In my particular case, I am forcing a repartition of a SchemaRDD with a terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into play. h3. Stack Traces h4. Task Submission {noformat} main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() [0x00010ed5e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at java.lang.Object.wait(Object.java:503) at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390) at org.apache.spark.rdd.RDD.reduce(RDD.scala:884) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183) at org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) - locked 0x0007c36ce038 (a org.apache.spark.sql.hive.HiveContext$$anon$7) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278) at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.ShuffleDependency.init(Dependency.scala:79) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304) - locked 0x0007f55c2238 (a org.apache.spark.scheduler.DAGScheduler) at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148) at org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175)
[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator
[ https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yin Huai updated SPARK-6012: Assignee: (was: Yin Huai) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator -- Key: SPARK-6012 URL: https://issues.apache.org/jira/browse/SPARK-6012 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1 Reporter: Max Seiden Priority: Critical h3. Summary I've found that a deadlock occurs when asking for the partitions from a SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs when a child RDDs asks the DAGScheduler for preferred partition locations (which locks the scheduler) and eventually hits the #execute() of the TakeOrdered operator, which submits tasks but is blocked when it also tries to get preferred locations (in a separate thread). It seems like the TakeOrdered op's #execute() method should not actually submit a task (it is calling #executeCollect() and creating a new RDD) and should instead stay more true to the comment a logically apply a Limit on top of a Sort. In my particular case, I am forcing a repartition of a SchemaRDD with a terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into play. h3. Stack Traces h4. Task Submission {noformat} main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() [0x00010ed5e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at java.lang.Object.wait(Object.java:503) at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390) at org.apache.spark.rdd.RDD.reduce(RDD.scala:884) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183) at org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) - locked 0x0007c36ce038 (a org.apache.spark.sql.hive.HiveContext$$anon$7) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278) at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.ShuffleDependency.init(Dependency.scala:79) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304) - locked 0x0007f55c2238 (a org.apache.spark.scheduler.DAGScheduler) at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148) at org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175) at
[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator
[ https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Wendell updated SPARK-6012: --- Target Version/s: 1.4.0 (was: 1.3.1, 1.4.0) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator -- Key: SPARK-6012 URL: https://issues.apache.org/jira/browse/SPARK-6012 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1 Reporter: Max Seiden Priority: Critical h3. Summary I've found that a deadlock occurs when asking for the partitions from a SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs when a child RDDs asks the DAGScheduler for preferred partition locations (which locks the scheduler) and eventually hits the #execute() of the TakeOrdered operator, which submits tasks but is blocked when it also tries to get preferred locations (in a separate thread). It seems like the TakeOrdered op's #execute() method should not actually submit a task (it is calling #executeCollect() and creating a new RDD) and should instead stay more true to the comment a logically apply a Limit on top of a Sort. In my particular case, I am forcing a repartition of a SchemaRDD with a terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into play. h3. Stack Traces h4. Task Submission {noformat} main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() [0x00010ed5e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at java.lang.Object.wait(Object.java:503) at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390) at org.apache.spark.rdd.RDD.reduce(RDD.scala:884) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183) at org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) - locked 0x0007c36ce038 (a org.apache.spark.sql.hive.HiveContext$$anon$7) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278) at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.ShuffleDependency.init(Dependency.scala:79) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304) - locked 0x0007f55c2238 (a org.apache.spark.scheduler.DAGScheduler) at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148) at
[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator
[ https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Wendell updated SPARK-6012: --- Target Version/s: 1.3.1, 1.4.0 (was: 1.4.0) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator -- Key: SPARK-6012 URL: https://issues.apache.org/jira/browse/SPARK-6012 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1 Reporter: Max Seiden Priority: Critical h3. Summary I've found that a deadlock occurs when asking for the partitions from a SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs when a child RDDs asks the DAGScheduler for preferred partition locations (which locks the scheduler) and eventually hits the #execute() of the TakeOrdered operator, which submits tasks but is blocked when it also tries to get preferred locations (in a separate thread). It seems like the TakeOrdered op's #execute() method should not actually submit a task (it is calling #executeCollect() and creating a new RDD) and should instead stay more true to the comment a logically apply a Limit on top of a Sort. In my particular case, I am forcing a repartition of a SchemaRDD with a terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into play. h3. Stack Traces h4. Task Submission {noformat} main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() [0x00010ed5e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at java.lang.Object.wait(Object.java:503) at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390) at org.apache.spark.rdd.RDD.reduce(RDD.scala:884) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183) at org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) - locked 0x0007c36ce038 (a org.apache.spark.sql.hive.HiveContext$$anon$7) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278) at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.ShuffleDependency.init(Dependency.scala:79) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304) - locked 0x0007f55c2238 (a org.apache.spark.scheduler.DAGScheduler) at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148) at
[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator
[ https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Armbrust updated SPARK-6012: Target Version/s: 1.4.0 Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator -- Key: SPARK-6012 URL: https://issues.apache.org/jira/browse/SPARK-6012 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1 Reporter: Max Seiden Priority: Critical h3. Summary I've found that a deadlock occurs when asking for the partitions from a SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs when a child RDDs asks the DAGScheduler for preferred partition locations (which locks the scheduler) and eventually hits the #execute() of the TakeOrdered operator, which submits tasks but is blocked when it also tries to get preferred locations (in a separate thread). It seems like the TakeOrdered op's #execute() method should not actually submit a task (it is calling #executeCollect() and creating a new RDD) and should instead stay more true to the comment a logically apply a Limit on top of a Sort. In my particular case, I am forcing a repartition of a SchemaRDD with a terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into play. h3. Stack Traces h4. Task Submission {noformat} main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() [0x00010ed5e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at java.lang.Object.wait(Object.java:503) at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390) at org.apache.spark.rdd.RDD.reduce(RDD.scala:884) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183) at org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) - locked 0x0007c36ce038 (a org.apache.spark.sql.hive.HiveContext$$anon$7) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278) at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.ShuffleDependency.init(Dependency.scala:79) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304) - locked 0x0007f55c2238 (a org.apache.spark.scheduler.DAGScheduler) at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148) at org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175)
[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator
[ https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Seiden updated SPARK-6012: -- Summary: Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator (was: Deadlock when asking for SchemaRDD partitions with TakeOrdered operator) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator -- Key: SPARK-6012 URL: https://issues.apache.org/jira/browse/SPARK-6012 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1 Reporter: Max Seiden Priority: Critical h3. Summary I've found that a deadlock occurs when asking for the partitions from a SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs when a child RDDs asks the DAGScheduler for preferred partition locations (which locks the scheduler) and eventually hits the #execute() of the TakeOrdered operator, which submits tasks but is blocked when it also tries to get preferred locations (in a separate thread). It seems like the TakeOrdered op's #execute() method should not actually submit a task (it is calling #executeCollect() and creating a new RDD) and should instead stay more true to the comment a logically apply a Limit on top of a Sort. In my particular case, I am forcing a repartition of a SchemaRDD with a terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into play. h3. Stack Traces h4. Task Submission {noformat} main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() [0x00010ed5e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at java.lang.Object.wait(Object.java:503) at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390) at org.apache.spark.rdd.RDD.reduce(RDD.scala:884) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183) at org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) - locked 0x0007c36ce038 (a org.apache.spark.sql.hive.HiveContext$$anon$7) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278) at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.ShuffleDependency.init(Dependency.scala:79) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304) - locked 0x0007f55c2238 (a org.apache.spark.scheduler.DAGScheduler) at