[ 
https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14304864#comment-14304864
 ] 

Sandy Ryza edited comment on SPARK-4550 at 2/5/15 12:36 AM:
------------------------------------------------------------

I had heard rumors to that effect, so I ran some experiments and didn't find 
that to be the case:

{code}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.SparkConf
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer

val ser1 = new KryoSerializer(new SparkConf)

def serialize(objs: Array[AnyRef], ser: KryoSerializer): Array[Byte] = {
  val instance = ser.newInstance
  val baos = new ByteArrayOutputStream()
  val stream = instance.serializeStream(baos)
  objs.foreach(obj => stream.writeObject(obj))
  stream.close()
  baos.toByteArray
}

val inner = (0 until 100000).toArray
val bytes1 = serialize(Array((1, inner), (2, inner)), ser1)

val inner1 = (0 until 100000).toArray
val inner2 = (0 until 100000).toArray
val bytes2 = serialize(Array((1, inner1), (2, inner2)), ser1)

val secondHalf = new Array[Byte](bytes1.size / 2)
System.arraycopy(bytes1, bytes1.size / 2, secondHalf, 0, bytes1.size / 2)

ser1.newInstance.deserialize[AnyRef](ByteBuffer.wrap(secondHalf))
{code}

A couple observations:
* "bytes1" ends up the same size as "bytes2", implying that "inner" is not 
being reference-tracked between the two writeObject calls
* The last line is able to successfully reproduce the second object, implying 
that there's no information written at the beginning of the stream needed to 
deserialize objects later down.

Are there cases or Kryo versions I'm not thinking about?


was (Author: sandyr):
I had heard rumors to that effect, so I ran some experiments and didn't find 
that to be the case:

{code}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.SparkConf
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer

val ser1 = new KryoSerializer(new SparkConf)

def serialize(objs: Array[AnyRef], ser: KryoSerializer): Array[Byte] = {
  val instance = ser.newInstance
  val baos = new ByteArrayOutputStream()
  val stream = instance.serializeStream(baos)
  objs.foreach(obj => stream.writeObject(obj))
  stream.close()
  baos.toByteArray
}

val inner = (0 until 100000).toArray
val bytes1 = serialize(Array((1, inner), (2, inner)), ser1)

val inner1 = (0 until 100000).toArray
val inner2 = (0 until 100000).toArray
val bytes2 = serialize(Array((1, inner1), (2, inner2)), ser1)

val secondHalf = new Array[Byte](bytes1.size / 2)
System.arraycopy(bytes1, bytes1.size / 2, secondHalf, 0, bytes1.size / 2)

ser1.newInstance.deserialize[AnyRef](ByteBuffer.wrap(secondHalf))
{code}

bytes1 ends up the same size as bytes2, and the last line is able to 
successfully reproduce the second object.

Are there cases or Kryo versions I'm not thinking about?

> In sort-based shuffle, store map outputs in serialized form
> -----------------------------------------------------------
>
>                 Key: SPARK-4550
>                 URL: https://issues.apache.org/jira/browse/SPARK-4550
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.2.0
>            Reporter: Sandy Ryza
>            Assignee: Sandy Ryza
>            Priority: Critical
>         Attachments: SPARK-4550-design-v1.pdf
>
>
> One drawback with sort-based shuffle compared to hash-based shuffle is that 
> it ends up storing many more java objects in memory.  If Spark could store 
> map outputs in serialized form, it could
> * spill less often because the serialized form is more compact
> * reduce GC pressure
> This will only work when the serialized representations of objects are 
> independent from each other and occupy contiguous segments of memory.  E.g. 
> when Kryo reference tracking is left on, objects may contain pointers to 
> objects farther back in the stream, which means that the sort can't relocate 
> objects without corrupting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to