[ 
https://issues.apache.org/jira/browse/SPARK-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-4454:
-----------------------------
    Labels:   (was: backport-needed)

> Race condition in DAGScheduler
> ------------------------------
>
>                 Key: SPARK-4454
>                 URL: https://issues.apache.org/jira/browse/SPARK-4454
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 1.1.0
>            Reporter: Rafal Kwasny
>            Assignee: Josh Rosen
>            Priority: Critical
>             Fix For: 1.3.0
>
>
> It seems to be a race condition in DAGScheduler that manifests on jobs with 
> high concurrency:
> {noformat}
>  Exception in thread "main" java.util.NoSuchElementException: key not found: 
> 35
>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>         at scala.collection.AbstractMap.default(Map.scala:58)
>         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>         at 
> org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:201)
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1292)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1307)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1306)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1304)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1304)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1307)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1306)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1304)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1304)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1307)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1306)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1304)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1304)
>         at 
> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1275)
>         at 
> org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:937)
>         at 
> org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175)
>         at 
> org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:192)
>         at 
> org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:191)
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>         at 
> org.apache.spark.rdd.PartitionCoalescer$LocationIterator.next(CoalescedRDD.scala:203)
>         at 
> org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:257)
>         at org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:338)
>         at 
> org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:84)
>         at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>         at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>         at scala.Option.getOrElse(Option.scala:120)
>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1150)
>         at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
>         at me.wwsga.driveclub.EnhancedRDD.saveAsPartitioned(Enhanced.scala:53)
>         at 
> Import$$anonfun$22$$anonfun$apply$9$$anonfun$apply$10.apply(Import.scala:186)
>         at 
> Import$$anonfun$22$$anonfun$apply$9$$anonfun$apply$10.apply(Import.scala:181)
>         at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>         at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
> Code:
> {noformat}
>   private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
>     if (!cacheLocs.contains(rdd.id)) {
>       val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, 
> index)).toArray[BlockId]
>       val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, 
> blockManagerMaster)
>       cacheLocs(rdd.id) = blockIds.map { id =>
>         locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, 
> bm.executorId))
>       }
>     }
>     cacheLocs(rdd.id)
>   }
> {noformat}
> Probably getOrElseUpdate pattern would be better for this code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to