[ https://issues.apache.org/jira/browse/SPARK-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen updated SPARK-4454: ----------------------------- Component/s: Scheduler > Race condition in DAGScheduler > ------------------------------ > > Key: SPARK-4454 > URL: https://issues.apache.org/jira/browse/SPARK-4454 > Project: Spark > Issue Type: Bug > Components: Scheduler > Affects Versions: 1.1.0 > Reporter: Rafal Kwasny > Priority: Minor > > It seems to be a race condition in DAGScheduler that manifests on jobs with > high concurrency: > {noformat} > Exception in thread "main" java.util.NoSuchElementException: key not found: > 35 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:201) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1292) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1307) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1306) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1304) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1304) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1307) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1306) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1304) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1304) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1307) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1306) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1304) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1304) > at > org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1275) > at > org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:937) > at > org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175) > at > org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:192) > at > org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:191) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) > at > org.apache.spark.rdd.PartitionCoalescer$LocationIterator.next(CoalescedRDD.scala:203) > at > org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:257) > at org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:338) > at > org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:84) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1150) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995) > at me.wwsga.driveclub.EnhancedRDD.saveAsPartitioned(Enhanced.scala:53) > at > Import$$anonfun$22$$anonfun$apply$9$$anonfun$apply$10.apply(Import.scala:186) > at > Import$$anonfun$22$$anonfun$apply$9$$anonfun$apply$10.apply(Import.scala:181) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > Code: > {noformat} > private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = { > if (!cacheLocs.contains(rdd.id)) { > val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, > index)).toArray[BlockId] > val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, > blockManagerMaster) > cacheLocs(rdd.id) = blockIds.map { id => > locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, > bm.executorId)) > } > } > cacheLocs(rdd.id) > } > {noformat} > Probably getOrElseUpdate pattern would be better for this code. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org