[ https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14495618#comment-14495618 ]
Yin Huai commented on SPARK-6012: --------------------------------- Seems the problem is that in {{SchemaRDD}}, we have {code} override protected def getDependencies: Seq[Dependency[_]] = { schema // Force reification of the schema so it is available on executors. List(new OneToOneDependency(queryExecution.toRdd)) } {code} {{queryExecution.toRdd}} may trigger a job execution for some cases (for example {{TakeOrdered}} at here). Starting with 1.3, {{DataFrame}} is not a an {{RDD}}. So, I think we will have have this problem anymore. I will test it with our master and see if we can resolve it. > Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered > operator > -------------------------------------------------------------------------------------- > > Key: SPARK-6012 > URL: https://issues.apache.org/jira/browse/SPARK-6012 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.2.1 > Reporter: Max Seiden > Priority: Critical > > h3. Summary > I've found that a deadlock occurs when asking for the partitions from a > SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs > when a child RDDs asks the DAGScheduler for preferred partition locations > (which locks the scheduler) and eventually hits the #execute() of the > TakeOrdered operator, which submits tasks but is blocked when it also tries > to get preferred locations (in a separate thread). It seems like the > TakeOrdered op's #execute() method should not actually submit a task (it is > calling #executeCollect() and creating a new RDD) and should instead stay > more true to the comment a logically apply a Limit on top of a Sort. > In my particular case, I am forcing a repartition of a SchemaRDD with a > terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into > play. > h3. Stack Traces > h4. Task Submission > {noformat} > "main" prio=5 tid=0x00007f8e72800000 nid=0x1303 in Object.wait() > [0x000000010ed5e000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x00000007c4c239b8> (a > org.apache.spark.scheduler.JobWaiter) > at java.lang.Object.wait(Object.java:503) > at > org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) > - locked <0x00000007c4c239b8> (a org.apache.spark.scheduler.JobWaiter) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:884) > at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161) > at > org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183) > at > org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) > - locked <0x00000007c36ce038> (a > org.apache.spark.sql.hive.HiveContext$$anon$7) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) > at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127) > at > org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) > at > org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) > at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278) > at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) > at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:79) > at > org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) > at > org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) > at > org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333) > at > org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304) > - locked <0x00000007f55c2238> (a > org.apache.spark.scheduler.DAGScheduler) > at > org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148) > at > org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175) > at > org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:192) > at > org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:191) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) > at > org.apache.spark.rdd.PartitionCoalescer$LocationIterator.<init>(CoalescedRDD.scala:186) > at > org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:237) > at org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:338) > at > org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:84) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) > at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) > at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) > at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) > {noformat} > h4. Submitted Task > {noformat} > "sparkDriver-akka.actor.default-dispatcher-2" daemon prio=5 > tid=0x00007f8e72248800 nid=0x5903 waiting for monitor entry > [0x000000012214c000] > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304) > - waiting to lock <0x00000007f55c2238> (a > org.apache.spark.scheduler.DAGScheduler) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:853) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:852) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:852) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org