[ 
https://issues.apache.org/jira/browse/SPARK-30786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-30786:
-----------------------------------

    Assignee: Prakhar Jain

> Block replication is not retried on other BlockManagers when it fails on 1 of 
> the peers
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-30786
>                 URL: https://issues.apache.org/jira/browse/SPARK-30786
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Spark Core
>    Affects Versions: 2.3.4, 2.4.5, 3.0.0
>            Reporter: Prakhar Jain
>            Assignee: Prakhar Jain
>            Priority: Major
>
> When we cache an RDD with replication > 1, Firstly the RDD block is cached 
> locally on one of the BlockManager and then it is replicated to 
> (replication-1) number of BlockManagers. While replicating a block, if 
> replication fails on one of the peers, it is supposed to retry the 
> replication on some other peer (based on 
> "spark.storage.maxReplicationFailures" config). But currently this doesn't 
> happen because of some issue.
> Logs of 1 of the executor which is trying to replicate:
> {noformat}
> 20/02/10 09:01:47 INFO Executor: Starting executor ID 1 on host 
> wn11-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net
> .
> .
> .
> 20/02/10 09:06:45 INFO Executor: Running task 244.0 in stage 3.0 (TID 550)
> 20/02/10 09:06:45 DEBUG BlockManager: Getting local block rdd_13_244
> 20/02/10 09:06:45 DEBUG BlockManager: Block rdd_13_244 was not found
> 20/02/10 09:06:45 DEBUG BlockManager: Getting remote block rdd_13_244
> 20/02/10 09:06:45 DEBUG BlockManager: Block rdd_13_244 not found
> 20/02/10 09:06:46 INFO MemoryStore: Block rdd_13_244 stored as values in 
> memory (estimated size 33.3 MB, free 44.2 MB)
> 20/02/10 09:06:46 DEBUG BlockManager: Told master about block rdd_13_244
> 20/02/10 09:06:46 DEBUG BlockManager: Put block rdd_13_244 locally took  947 
> ms
> 20/02/10 09:06:46 DEBUG BlockManager: Level for block rdd_13_244 is 
> StorageLevel(memory, deserialized, 3 replicas)
> 20/02/10 09:06:46 TRACE BlockManager: Trying to replicate rdd_13_244 of 
> 34908552 bytes to BlockManagerId(2, 
> wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None)
> 20/02/10 09:06:47 TRACE BlockManager: Replicated rdd_13_244 of 34908552 bytes 
> to BlockManagerId(2, 
> wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None) 
> in 205.849858 ms
> 20/02/10 09:06:47 TRACE BlockManager: Trying to replicate rdd_13_244 of 
> 34908552 bytes to BlockManagerId(5, 
> wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None)
> 20/02/10 09:06:47 TRACE BlockManager: Replicated rdd_13_244 of 34908552 bytes 
> to BlockManagerId(5, 
> wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None) 
> in 180.501504 ms
> 20/02/10 09:06:47 DEBUG BlockManager: Replicating rdd_13_244 of 34908552 
> bytes to 2 peer(s) took 387.381168 ms
> 20/02/10 09:06:47 DEBUG BlockManager: block rdd_13_244 replicated to 
> BlockManagerId(5, 
> wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None), 
> BlockManagerId(2, 
> wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None)
> 20/02/10 09:06:47 DEBUG BlockManager: Put block rdd_13_244 remotely took  423 
> ms
> 20/02/10 09:06:47 DEBUG BlockManager: Putting block rdd_13_244 with 
> replication took  1371 ms
> 20/02/10 09:06:47 DEBUG BlockManager: Getting local block rdd_13_244
> 20/02/10 09:06:47 DEBUG BlockManager: Level for block rdd_13_244 is 
> StorageLevel(memory, deserialized, 3 replicas)
> 20/02/10 09:06:47 INFO Executor: Finished task 244.0 in stage 3.0 (TID 550). 
> 2253 bytes result sent to driver
> {noformat}
> Logs of other executor where the block is being replicated to:
> {noformat}
> 20/02/10 09:01:47 INFO Executor: Starting executor ID 5 on host 
> wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net
> .
> .
> .
> 20/02/10 09:06:47 INFO MemoryStore: Will not store rdd_13_244
> 20/02/10 09:06:47 WARN MemoryStore: Not enough space to cache rdd_13_244 in 
> memory! (computed 4.2 MB so far)
> 20/02/10 09:06:47 INFO MemoryStore: Memory use = 4.9 GB (blocks) + 7.3 MB 
> (scratch space shared across 2 tasks(s)) = 4.9 GB. Storage limit = 4.9 GB.
> 20/02/10 09:06:47 DEBUG BlockManager: Put block rdd_13_244 locally took  12 ms
> 20/02/10 09:06:47 WARN BlockManager: Block rdd_13_244 could not be removed as 
> it was not found on disk or in memory
> 20/02/10 09:06:47 WARN BlockManager: Putting block rdd_13_244 failed
> 20/02/10 09:06:47 DEBUG BlockManager: Putting block rdd_13_244 without 
> replication took  13 ms
> {noformat}
> Note here that the block replication failed in Executor-5 with log line "Not 
> enough space to cache rdd_13_244 in memory!". But Executor-1 shows that block 
> is successfully replicated to executor-5 - "Replicated rdd_13_244 of 34908552 
> bytes to BlockManagerId(5, 
> wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None)" 
> and so it never retries the replication on some other executor.
> Sample code:
> {noformat}
> sc.setLogLevel("INFO")
> def randomString(length: Int) = {
>   val r = new scala.util.Random
>   val sb = new StringBuilder
>   for (i <- 1 to length) \{ sb.append(r.nextPrintableChar) }
>   sb.toString
> }
>  
> val df = sc.parallelize(1 to 300000, 300).map\{x => randomString(100000)}.toDF
> import org.apache.spark.storage.StorageLevel
> df.persist(StorageLevel(false, true, false, true, 3))
> df.count()
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to