[jira] [Commented] (SPARK-19108) Broadcast all shared parts of tasks (to reduce task serialization time)

2017-01-09 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812130#comment-15812130
 ] 

Imran Rashid commented on SPARK-19108:
--

yeah, great point.  I was thinking two broadcasts just for minimal code changes 
but this would already be a big enough refactor it makes sense to pull them 
together.

> Broadcast all shared parts of tasks (to reduce task serialization time)
> ---
>
> Key: SPARK-19108
> URL: https://issues.apache.org/jira/browse/SPARK-19108
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Reporter: Kay Ousterhout
>
> Expand the amount of information that's broadcasted for tasks, to avoid 
> serializing data per-task that should only be sent to each executor once for 
> the entire stage.
> Conceptually, this means we'd have new classes  specially for sending the 
> minimal necessary data to the executor, like:
> {code}
> /**
>   * metadata about the taskset needed by the executor for all tasks in this 
> taskset.  Subset of the
>   * full data kept on the driver to make it faster to serialize and send to 
> executors.
>   */
> class ExecutorTaskSetMeta(
>   val stageId: Int,
>   val stageAttemptId: Int,
>   val properties: Properties,
>   val addedFiles: Map[String, String],
>   val addedJars: Map[String, String]
>   // maybe task metrics here?
> )
> class ExecutorTaskData(
>   val partitionId: Int,
>   val attemptNumber: Int,
>   val taskId: Long,
>   val taskBinary: Broadcast[Array[Byte]],
>   val taskSetMeta: Broadcast[ExecutorTaskSetMeta]
> )
> {code}
> Then all the info you'd need to send to the executors would be a serialized 
> version of ExecutorTaskData.  Furthermore, given the simplicity of that 
> class, you could serialize manually, and then for each task you could just 
> modify the first two ints & one long directly in the byte buffer.  (You could 
> do the same trick for serialization even if ExecutorTaskSetMeta was not a 
> broadcast, but that will keep the msgs small as well.)
> There a bunch of details I'm skipping here: you'd also need to do some 
> special handling for the TaskMetrics; the way tasks get started in the 
> executor would change; you'd also need to refactor {{Task}} to let it get 
> reconstructed from this information (or add more to ExecutorTaskSetMeta); and 
> probably other details I'm overlooking now.
> (this is copied from SPARK-18890 and [~imranr]'s comment there; cc 
> [~shivaram])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19108) Broadcast all shared parts of tasks (to reduce task serialization time)

2017-01-09 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812127#comment-15812127
 ] 

Imran Rashid commented on SPARK-19108:
--

On SPARK-18890, [~gq] pointed out something really important I was forgetting 
about:

bq. OK, I see.In fact, the most difficult to deal with the partition, including 
HadoopPartition, JDBCPartition, BlockRDDPartition, and even other user-defined 
class. We can only use the Java serializer to serialize instances of these 
classes.

I had only been thinking of RDDs where the partition didn't have any useful 
information for the executor, eg. ShuffledRDD and mapPartitionsRDD, saving to 
hadoop, etc.  So that trick of just updating a couple of bytes directly in the 
byte buffer wont' work in general.

A couple more ideas that I had after exploring this a bit more -- these are 
pretty research-y so I don't think it merits a specific jira or any immediate 
action until someone actually measures whether these will actually have an 
impact, or would just be unnecessary optimizations:

1) If we do the above, it might be interesting if the api for {{Partition}} (or 
maybe {{RDD}}) was extended so specifics classes could indicate whether the 
partition actually needed to be serialized, and skip it when possible.

2) The partition serialization seems like it could be made a lot more 
efficient, if it wasn't done independently for each task.  For example, for 
hadoop RDD, all the cases I know of when there are tons of paths generated tend 
to look like

{noformat}
hdfs:///some/super/long/base/path/partColA=X/year=2016/month=01/day=17/part-1
...
hdfs:///some/super/long/base/path/partColA=X/year=2016/month=04/day=14/part-5
...
{noformat}

If you were serializing each of these independently, there isn't much you can 
do, but for serializing all of them, even a pretty simple encoding could be way 
more efficient.  And even more than just the paths, there is even more repeated 
info buried in 
[FileSplit|https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java#L42].
  In MapReduce, they all have to be serialized independently since each use is 
a separate JVM, but Spark can do better.

Similarly, our [KafkaRDDPartition| 
https://github.com/apache/spark/blob/master/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala#L31]
 has {{topic}} and {{host}}, which probably get repeated a lot (though {{host}} 
is dropped in the new [kafka010 version| 
https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala#L32]).

anyway these ideas are kind of out there and just brainstorming, but perhaps 
useful for trying to go really low latency, especially for streaming.  Maybe up 
your alley, [~shivaram] or [~kayousterhout]?

> Broadcast all shared parts of tasks (to reduce task serialization time)
> ---
>
> Key: SPARK-19108
> URL: https://issues.apache.org/jira/browse/SPARK-19108
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Reporter: Kay Ousterhout
>
> Expand the amount of information that's broadcasted for tasks, to avoid 
> serializing data per-task that should only be sent to each executor once for 
> the entire stage.
> Conceptually, this means we'd have new classes  specially for sending the 
> minimal necessary data to the executor, like:
> {code}
> /**
>   * metadata about the taskset needed by the executor for all tasks in this 
> taskset.  Subset of the
>   * full data kept on the driver to make it faster to serialize and send to 
> executors.
>   */
> class ExecutorTaskSetMeta(
>   val stageId: Int,
>   val stageAttemptId: Int,
>   val properties: Properties,
>   val addedFiles: Map[String, String],
>   val addedJars: Map[String, String]
>   // maybe task metrics here?
> )
> class ExecutorTaskData(
>   val partitionId: Int,
>   val attemptNumber: Int,
>   val taskId: Long,
>   val taskBinary: Broadcast[Array[Byte]],
>   val taskSetMeta: Broadcast[ExecutorTaskSetMeta]
> )
> {code}
> Then all the info you'd need to send to the executors would be a serialized 
> version of ExecutorTaskData.  Furthermore, given the simplicity of that 
> class, you could serialize manually, and then for each task you could just 
> modify the first two ints & one long directly in the byte buffer.  (You could 
> do the same trick for serialization even if ExecutorTaskSetMeta was not a 
> broadcast, but that will keep the msgs small as well.)
> There a bunch of details I'm skipping here: you'd also need to do some 
> special handling for the TaskMetrics; the way tasks get started in the 
> executor would 

[jira] [Commented] (SPARK-19108) Broadcast all shared parts of tasks (to reduce task serialization time)

2017-01-06 Thread Shivaram Venkataraman (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15806774#comment-15806774
 ] 

Shivaram Venkataraman commented on SPARK-19108:
---

+1 - This is a good idea. One thing I'd like to add is that it might be better 
to create one broadcast rather than two broadcasts for sake of efficiency. For 
each broadcast variable we contact the driver to get location information and 
then initiate some fetches -- Thus to keep the number of messages lower having 
one broadcast variable will be better.

> Broadcast all shared parts of tasks (to reduce task serialization time)
> ---
>
> Key: SPARK-19108
> URL: https://issues.apache.org/jira/browse/SPARK-19108
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Reporter: Kay Ousterhout
>
> Expand the amount of information that's broadcasted for tasks, to avoid 
> serializing data per-task that should only be sent to each executor once for 
> the entire stage.
> Conceptually, this means we'd have new classes  specially for sending the 
> minimal necessary data to the executor, like:
> {code}
> /**
>   * metadata about the taskset needed by the executor for all tasks in this 
> taskset.  Subset of the
>   * full data kept on the driver to make it faster to serialize and send to 
> executors.
>   */
> class ExecutorTaskSetMeta(
>   val stageId: Int,
>   val stageAttemptId: Int,
>   val properties: Properties,
>   val addedFiles: Map[String, String],
>   val addedJars: Map[String, String]
>   // maybe task metrics here?
> )
> class ExecutorTaskData(
>   val partitionId: Int,
>   val attemptNumber: Int,
>   val taskId: Long,
>   val taskBinary: Broadcast[Array[Byte]],
>   val taskSetMeta: Broadcast[ExecutorTaskSetMeta]
> )
> {code}
> Then all the info you'd need to send to the executors would be a serialized 
> version of ExecutorTaskData.  Furthermore, given the simplicity of that 
> class, you could serialize manually, and then for each task you could just 
> modify the first two ints & one long directly in the byte buffer.  (You could 
> do the same trick for serialization even if ExecutorTaskSetMeta was not a 
> broadcast, but that will keep the msgs small as well.)
> There a bunch of details I'm skipping here: you'd also need to do some 
> special handling for the TaskMetrics; the way tasks get started in the 
> executor would change; you'd also need to refactor {{Task}} to let it get 
> reconstructed from this information (or add more to ExecutorTaskSetMeta); and 
> probably other details I'm overlooking now.
> (this is copied from SPARK-18890 and [~imranr]'s comment there; cc 
> [~shivaram])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org