[ https://issues.apache.org/jira/browse/SPARK-14252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andrew Or resolved SPARK-14252. ------------------------------- Resolution: Fixed Assignee: Eric Liang Fix Version/s: 2.0.0 > Executors do not try to download remote cached blocks > ----------------------------------------------------- > > Key: SPARK-14252 > URL: https://issues.apache.org/jira/browse/SPARK-14252 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.0.0 > Reporter: Marcelo Vanzin > Assignee: Eric Liang > Fix For: 2.0.0 > > > I noticed this when taking a look at the root cause of SPARK-14209. 2.0.0 > includes SPARK-12817, which changed the caching code a bit to remove > duplication. But it seems to have removed the part where executors check > whether other executors contain the needed cached block. > In 1.6, that was done by the call to {{BlockManager.get}} in > {{CacheManager.getOrCompute}}. But in the new version, {{RDD.iterator}} calls > {{BlockManager.getOrElseUpdate}}, which never calls {{BlockManager.get}}, and > thus the executor never gets block that are cached by other executors, > causing the blocks to be instead recomputed locally. > I wrote a small program that shows this. In 1.6, running with > {{--num-executors 2}}, I get 5 blocks cached on each executor, and messages > like these in the logs: > {noformat} > 16/03/29 13:18:01 DEBUG spark.CacheManager: Looking for partition rdd_0_7 > 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting local block rdd_0_7 > 16/03/29 13:18:01 DEBUG storage.BlockManager: Block rdd_0_7 not registered > locally > 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7 > 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7 > from BlockManagerId(1, blah, 58831) > 1 > {noformat} > On 2.0, I get (almost) all the 10 partitions cached on *both* executors, > because once the second one fails to find a block locally it just recomputes > it and caches it. It never tries to download the block from the other > executor. The log messages above, which still exist in the code, don't show > up anywhere. > Here's the code I used for the above (trimmed of some other stuff from my > little test harness, so might not compile as is): > {code} > val sc = new SparkContext(conf) > try { > val rdd = sc.parallelize(1 to 10000000, 10) > rdd.cache() > rdd.count() > // Create a single task that will sleep and block, so that a particular > executor is busy. > // This should force future tasks to download cached data from that > executor. > println("Running sleep job..") > val thread = new Thread(new Runnable() { > override def run(): Unit = { > rdd.mapPartitionsWithIndex { (i, iter) => > if (i == 0) { > Thread.sleep(TimeUnit.MINUTES.toMillis(10)) > } > iter > }.count() > } > }) > thread.setDaemon(true) > thread.start() > // Wait a few seconds to make sure the task is running (too lazy for > listeners) > println("Waiting for tasks to start...") > TimeUnit.SECONDS.sleep(10) > // Now run a job that will touch everything and should use the cached > data. > val cnt = rdd.map(_*2).count() > println(s"Counted $cnt elements.") > println("Killing sleep job.") > thread.interrupt() > thread.join() > } finally { > sc.stop() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org