[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form

2015-02-08 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14310130#comment-14310130
 ] 

Sandy Ryza edited comment on SPARK-4550 at 2/8/15 9:07 PM:
---

I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of Records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time (ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|50 million|false|10166122563|17|101831|89960|191791|
|50 million|true|3067937592|5|76801|78361|155161|


was (Author: sandyr):
I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of Records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time (ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|

> In sort-based shuffle, store map outputs in serialized form
> ---
>
> Key: SPARK-4550
> URL: https://issues.apache.org/jira/browse/SPARK-4550
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Sandy Ryza
>Assignee: Sandy Ryza
>Priority: Critical
> Attachments: SPARK-4550-design-v1.pdf
>
>
> One drawback with sort-based shuffle compared to hash-based shuffle is that 
> it ends up storing many more java objects in memory.  If Spark could store 
> map outputs in serialized form, it could
> * spill less often because the serialized form is more compact
> * reduce GC pressure
> This will only work when the serialized representations of objects are 
> independent from each other and occupy contiguous segments of memory.  E.g. 
> when Kryo reference tracking is left on, objects may contain pointers to 
> objects farther back in the stream, which means that the sort can't relocate 
> objects without corrupting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form

2015-02-06 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14310130#comment-14310130
 ] 

Sandy Ryza edited comment on SPARK-4550 at 2/6/15 11:13 PM:


I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time (ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|


was (Author: sandyr):
I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time(ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|

> In sort-based shuffle, store map outputs in serialized form
> ---
>
> Key: SPARK-4550
> URL: https://issues.apache.org/jira/browse/SPARK-4550
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Sandy Ryza
>Assignee: Sandy Ryza
>Priority: Critical
> Attachments: SPARK-4550-design-v1.pdf
>
>
> One drawback with sort-based shuffle compared to hash-based shuffle is that 
> it ends up storing many more java objects in memory.  If Spark could store 
> map outputs in serialized form, it could
> * spill less often because the serialized form is more compact
> * reduce GC pressure
> This will only work when the serialized representations of objects are 
> independent from each other and occupy contiguous segments of memory.  E.g. 
> when Kryo reference tracking is left on, objects may contain pointers to 
> objects farther back in the stream, which means that the sort can't relocate 
> objects without corrupting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form

2015-02-06 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14310130#comment-14310130
 ] 

Sandy Ryza edited comment on SPARK-4550 at 2/6/15 11:13 PM:


I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of Records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time (ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|


was (Author: sandyr):
I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time (ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|

> In sort-based shuffle, store map outputs in serialized form
> ---
>
> Key: SPARK-4550
> URL: https://issues.apache.org/jira/browse/SPARK-4550
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Sandy Ryza
>Assignee: Sandy Ryza
>Priority: Critical
> Attachments: SPARK-4550-design-v1.pdf
>
>
> One drawback with sort-based shuffle compared to hash-based shuffle is that 
> it ends up storing many more java objects in memory.  If Spark could store 
> map outputs in serialized form, it could
> * spill less often because the serialized form is more compact
> * reduce GC pressure
> This will only work when the serialized representations of objects are 
> independent from each other and occupy contiguous segments of memory.  E.g. 
> when Kryo reference tracking is left on, objects may contain pointers to 
> objects farther back in the stream, which means that the sort can't relocate 
> objects without corrupting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form

2015-02-06 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14310130#comment-14310130
 ] 

Sandy Ryza edited comment on SPARK-4550 at 2/6/15 11:08 PM:


I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time(ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|false|2050514159|3|26723|17418|44141|
|10 million|true|613614392|1|16501|17151|33652|
|10 million|false|10166122563|17|101831|89960|191791|
|10 million|true|3067937592|5|76801|78361|155161|


was (Author: sandyr):
I got a working prototype and benchmarked the ExternalSorter changes on my 
laptop.

Each run inserts a bunch of records, each a (Int, (10-character string, Int)) 
tuple, into an ExternalSorter and then calls writePartitionedFile.  The 
reported memory size is the sum of the shuffle bytes spilled (mem) metric and 
the remaining size of the collection after insertion has completed.  Results 
are averaged over three runs.

Keep in mind that the primary goal here is to reduce GC pressure, so any speed 
improvements are icing.

||Number of records||Storing as Serialized||Memory Size||Number of 
Spills||Insert Time(ms)||Write Time (ms)||Total Time||
|1 million|false|194923217|0|1123|3442|4566|
|1 million|true|48694072|0|1315|2652|3967|
|10 million|true|2050514159|3|26723|17418|44141|
|10 million|false|613614392|1|16501|17151|33652|
|10 million|true|10166122563|17|101831|89960|191791|
|10 million|false|3067937592|5|76801|78361|155161|

> In sort-based shuffle, store map outputs in serialized form
> ---
>
> Key: SPARK-4550
> URL: https://issues.apache.org/jira/browse/SPARK-4550
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Sandy Ryza
>Assignee: Sandy Ryza
>Priority: Critical
> Attachments: SPARK-4550-design-v1.pdf
>
>
> One drawback with sort-based shuffle compared to hash-based shuffle is that 
> it ends up storing many more java objects in memory.  If Spark could store 
> map outputs in serialized form, it could
> * spill less often because the serialized form is more compact
> * reduce GC pressure
> This will only work when the serialized representations of objects are 
> independent from each other and occupy contiguous segments of memory.  E.g. 
> when Kryo reference tracking is left on, objects may contain pointers to 
> objects farther back in the stream, which means that the sort can't relocate 
> objects without corrupting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form

2015-02-04 Thread Sandy Ryza (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14304864#comment-14304864
 ] 

Sandy Ryza edited comment on SPARK-4550 at 2/5/15 12:36 AM:


I had heard rumors to that effect, so I ran some experiments and didn't find 
that to be the case:

{code}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.SparkConf
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer

val ser1 = new KryoSerializer(new SparkConf)

def serialize(objs: Array[AnyRef], ser: KryoSerializer): Array[Byte] = {
  val instance = ser.newInstance
  val baos = new ByteArrayOutputStream()
  val stream = instance.serializeStream(baos)
  objs.foreach(obj => stream.writeObject(obj))
  stream.close()
  baos.toByteArray
}

val inner = (0 until 10).toArray
val bytes1 = serialize(Array((1, inner), (2, inner)), ser1)

val inner1 = (0 until 10).toArray
val inner2 = (0 until 10).toArray
val bytes2 = serialize(Array((1, inner1), (2, inner2)), ser1)

val secondHalf = new Array[Byte](bytes1.size / 2)
System.arraycopy(bytes1, bytes1.size / 2, secondHalf, 0, bytes1.size / 2)

ser1.newInstance.deserialize[AnyRef](ByteBuffer.wrap(secondHalf))
{code}

A couple observations:
* "bytes1" ends up the same size as "bytes2", implying that "inner" is not 
being reference-tracked between the two writeObject calls
* The last line is able to successfully reproduce the second object, implying 
that there's no information written at the beginning of the stream needed to 
deserialize objects later down.

Are there cases or Kryo versions I'm not thinking about?


was (Author: sandyr):
I had heard rumors to that effect, so I ran some experiments and didn't find 
that to be the case:

{code}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.SparkConf
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer

val ser1 = new KryoSerializer(new SparkConf)

def serialize(objs: Array[AnyRef], ser: KryoSerializer): Array[Byte] = {
  val instance = ser.newInstance
  val baos = new ByteArrayOutputStream()
  val stream = instance.serializeStream(baos)
  objs.foreach(obj => stream.writeObject(obj))
  stream.close()
  baos.toByteArray
}

val inner = (0 until 10).toArray
val bytes1 = serialize(Array((1, inner), (2, inner)), ser1)

val inner1 = (0 until 10).toArray
val inner2 = (0 until 10).toArray
val bytes2 = serialize(Array((1, inner1), (2, inner2)), ser1)

val secondHalf = new Array[Byte](bytes1.size / 2)
System.arraycopy(bytes1, bytes1.size / 2, secondHalf, 0, bytes1.size / 2)

ser1.newInstance.deserialize[AnyRef](ByteBuffer.wrap(secondHalf))
{code}

bytes1 ends up the same size as bytes2, and the last line is able to 
successfully reproduce the second object.

Are there cases or Kryo versions I'm not thinking about?

> In sort-based shuffle, store map outputs in serialized form
> ---
>
> Key: SPARK-4550
> URL: https://issues.apache.org/jira/browse/SPARK-4550
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Sandy Ryza
>Assignee: Sandy Ryza
>Priority: Critical
> Attachments: SPARK-4550-design-v1.pdf
>
>
> One drawback with sort-based shuffle compared to hash-based shuffle is that 
> it ends up storing many more java objects in memory.  If Spark could store 
> map outputs in serialized form, it could
> * spill less often because the serialized form is more compact
> * reduce GC pressure
> This will only work when the serialized representations of objects are 
> independent from each other and occupy contiguous segments of memory.  E.g. 
> when Kryo reference tracking is left on, objects may contain pointers to 
> objects farther back in the stream, which means that the sort can't relocate 
> objects without corrupting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org