[ 
https://issues.apache.org/jira/browse/SPARK-2521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-2521:
-------------------------------

    Description: 
Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) 
using Akka along with the task itself to the executors. This is inefficient 
because all tasks in the same stage use the same RDD object, but we have to 
send RDD object multiple times to the executors. This is especially bad when a 
closure references some variable that is very large. The current design led to 
users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors, and 
use Akka to only send a reference to the broadcast RDD/closure along with the 
partition specific information for the task. For those of you who know more 
about the internals, Spark already relies on broadcast to send the Hadoop 
JobConf every time it uses the Hadoop input, because the JobConf is large.

The user-facing impact of the change include:

Users won't need to decide what to broadcast anymore, unless they would want to 
use a large object multiple times in different operations
Task size will get smaller, resulting in faster scheduling and higher task 
dispatch throughput.
In addition, the change will simplify some internals of Spark, eliminating the 
need to maintain task caches and the complex logic to broadcast JobConf (which 
also led to a deadlock recently).

A simple way to test this:

{code}
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count
Numbers on 3 r3.8xlarge instances on EC2

master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
{code}

  was:
This can substantially reduce task size, as well as being able to support very 
large closures (e.g. closures that reference large variables).

Once this is in, we can also remove broadcasting the Hadoop JobConf.


> Broadcast RDD object once per TaskSet (instead of sending it for every task)
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-2521
>                 URL: https://issues.apache.org/jira/browse/SPARK-2521
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>            Reporter: Reynold Xin
>            Assignee: Reynold Xin
>
> Currently (as of Spark 1.0.1), Spark sends RDD object (which contains 
> closures) using Akka along with the task itself to the executors. This is 
> inefficient because all tasks in the same stage use the same RDD object, but 
> we have to send RDD object multiple times to the executors. This is 
> especially bad when a closure references some variable that is very large. 
> The current design led to users having to explicitly broadcast large 
> variables.
> The patch uses broadcast to send RDD objects and the closures to executors, 
> and use Akka to only send a reference to the broadcast RDD/closure along with 
> the partition specific information for the task. For those of you who know 
> more about the internals, Spark already relies on broadcast to send the 
> Hadoop JobConf every time it uses the Hadoop input, because the JobConf is 
> large.
> The user-facing impact of the change include:
> Users won't need to decide what to broadcast anymore, unless they would want 
> to use a large object multiple times in different operations
> Task size will get smaller, resulting in faster scheduling and higher task 
> dispatch throughput.
> In addition, the change will simplify some internals of Spark, eliminating 
> the need to maintain task caches and the complex logic to broadcast JobConf 
> (which also led to a deadlock recently).
> A simple way to test this:
> {code}
> val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
> sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count
> Numbers on 3 r3.8xlarge instances on EC2
> master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
> with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to