[ 
https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-17951:
------------------------------
    Description: 
The following code demonstrates the issue:

{code}
def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(s"BMTest")
    val size = 3344570
    val sc = new SparkContext(conf)

    val data = sc.parallelize(1 to 100, 8)
    var accum = sc.accumulator(0.0, "get remote bytes")
    var i = 0
    while(i < 91) {
      accum = sc.accumulator(0.0, "get remote bytes")
      val test = data.mapPartitionsWithIndex { (pid, iter) =>
        val N = size
        val bm = SparkEnv.get.blockManager
        val blockId = TaskResultBlockId(10*i + pid)        
        val test = new Array[Byte](N)
        Random.nextBytes(test)
        val buffer = ByteBuffer.allocate(N)
        buffer.limit(N)
        buffer.put(test)
        bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)        
        Iterator(1)
      }.count()
      
      data.mapPartitionsWithIndex { (pid, iter) =>
        val before = System.nanoTime()
        
        val bm = SparkEnv.get.blockManager
        (0 to 7).map(s => {
          Future {
            val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
          }
        }).map(Await.result(_, Duration.Inf))
        
        accum.add((System.nanoTime() - before) / 1e9)
        Iterator(1)
      }.count()
      println("get remote bytes take: " + accum.value/8)
      i += 1
    }        
  }
{code}


In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while
in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s

However if fetch block in single thread, the gap is much smaller.
spark1.6.2  get remote bytes: 0.21 s
spark1.5.1  get remote bytes: 0.20 s



  was:
The following code demonstrates the issue:
def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(s"BMTest")
    val size = 3344570
    val sc = new SparkContext(conf)

    val data = sc.parallelize(1 to 100, 8)
    var accum = sc.accumulator(0.0, "get remote bytes")
    var i = 0
    while(i < 91) {
      accum = sc.accumulator(0.0, "get remote bytes")
      val test = data.mapPartitionsWithIndex { (pid, iter) =>
        val N = size
        val bm = SparkEnv.get.blockManager
        val blockId = TaskResultBlockId(10*i + pid)        
        val test = new Array[Byte](N)
        Random.nextBytes(test)
        val buffer = ByteBuffer.allocate(N)
        buffer.limit(N)
        buffer.put(test)
        bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)        
        Iterator(1)
      }.count()
      
      data.mapPartitionsWithIndex { (pid, iter) =>
        val before = System.nanoTime()
        
        val bm = SparkEnv.get.blockManager
        (0 to 7).map(s => {
          Future {
            val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
          }
        }).map(Await.result(_, Duration.Inf))
        
        accum.add((System.nanoTime() - before) / 1e9)
        Iterator(1)
      }.count()
      println("get remote bytes take: " + accum.value/8)
      i += 1
    }        
  }

In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while
in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s

However if fetch block in single thread, the gap is much smaller.
spark1.6.2  get remote bytes: 0.21 s
spark1.5.1  get remote bytes: 0.20 s




> BlockFetch with multiple threads slows down after spark 1.6
> -----------------------------------------------------------
>
>                 Key: SPARK-17951
>                 URL: https://issues.apache.org/jira/browse/SPARK-17951
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Spark Core
>    Affects Versions: 1.6.2
>         Environment: cluster with 8 node, each node has 28 cores. 10Gb network
>            Reporter: ding
>
> The following code demonstrates the issue:
> {code}
> def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setAppName(s"BMTest")
>     val size = 3344570
>     val sc = new SparkContext(conf)
>     val data = sc.parallelize(1 to 100, 8)
>     var accum = sc.accumulator(0.0, "get remote bytes")
>     var i = 0
>     while(i < 91) {
>       accum = sc.accumulator(0.0, "get remote bytes")
>       val test = data.mapPartitionsWithIndex { (pid, iter) =>
>         val N = size
>         val bm = SparkEnv.get.blockManager
>         val blockId = TaskResultBlockId(10*i + pid)        
>         val test = new Array[Byte](N)
>         Random.nextBytes(test)
>         val buffer = ByteBuffer.allocate(N)
>         buffer.limit(N)
>         buffer.put(test)
>         bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)        
>         Iterator(1)
>       }.count()
>       
>       data.mapPartitionsWithIndex { (pid, iter) =>
>         val before = System.nanoTime()
>         
>         val bm = SparkEnv.get.blockManager
>         (0 to 7).map(s => {
>           Future {
>             val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
>           }
>         }).map(Await.result(_, Duration.Inf))
>         
>         accum.add((System.nanoTime() - before) / 1e9)
>         Iterator(1)
>       }.count()
>       println("get remote bytes take: " + accum.value/8)
>       i += 1
>     }        
>   }
> {code}
> In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while
> in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s
> However if fetch block in single thread, the gap is much smaller.
> spark1.6.2  get remote bytes: 0.21 s
> spark1.5.1  get remote bytes: 0.20 s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to