Marcelo Vanzin created SPARK-14252:
--------------------------------------

             Summary: Executors do not try to download remote cached blocks
                 Key: SPARK-14252
                 URL: https://issues.apache.org/jira/browse/SPARK-14252
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.0.0
            Reporter: Marcelo Vanzin


I noticed this when taking a look at the root cause of SPARK-14209. 2.0.0 
includes SPARK-12817, which changed the caching code a bit to remove 
duplication. But it seems to have removed the part where executors check 
whether other executors contain the needed cached block.

In 1.6, that was done by the call to {{BlockManager.get}} in 
{{CacheManager.getOrCompute}}. But in the new version, {{RDD.iterator}} calls 
{{BlockManager.getOrElseUpdate}}, which never calls {{BlockManager.get}}, and 
thus the executor never gets block that are cached by other executors, causing 
the blocks to be instead recomputed locally.

I wrote a small program that shows this. In 1.6, running with {{--num-executors 
2}}, I get 5 blocks cached on each executor, and messages like these in the 
logs:

{noformat}
16/03/29 13:18:01 DEBUG spark.CacheManager: Looking for partition rdd_0_7
16/03/29 13:18:01 DEBUG storage.BlockManager: Getting local block rdd_0_7
16/03/29 13:18:01 DEBUG storage.BlockManager: Block rdd_0_7 not registered 
locally
16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7
16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7 from 
BlockManagerId(1, blah, 58831)
1
{noformat}

On 2.0, I get (almost) all the 10 partitions cached on *both* executors, 
because once the second one fails to find a block locally it just recomputes it 
and caches it. It never tries to download the block from the other executor.  
The log messages above, which still exist in the code, don't show up anywhere.


Here's the code I used for the above (trimmed of some other stuff from my 
little test harness, so might not compile as is):

{code}
    val sc = new SparkContext(conf)
    try {
      val rdd = sc.parallelize(1 to 10000000, 10)
      rdd.cache()
      rdd.count()

      // Create a single task that will sleep and block, so that a particular 
executor is busy.
      // This should force future tasks to download cached data from that 
executor.
      println("Running sleep job..")
      val thread =  new Thread(new Runnable() {
        override def run(): Unit = {
          rdd.mapPartitionsWithIndex { (i, iter) =>
            if (i == 0) {
              Thread.sleep(TimeUnit.MINUTES.toMillis(10))
            }
            iter
          }.count()
        }
      })
      thread.setDaemon(true)
      thread.start()

      // Wait a few seconds to make sure the task is running (too lazy for 
listeners)
      println("Waiting for tasks to start...")
      TimeUnit.SECONDS.sleep(10)

      // Now run a job that will touch everything and should use the cached 
data.
      val cnt = rdd.map(_*2).count()
      println(s"Counted $cnt elements.")

      println("Killing sleep job.")
      thread.interrupt()
      thread.join()
    } finally {
      sc.stop()
    }
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to