[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form
[ https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14310130#comment-14310130 ] Sandy Ryza edited comment on SPARK-4550 at 2/8/15 9:07 PM: --- I got a working prototype and benchmarked the ExternalSorter changes on my laptop. Each run inserts a bunch of records, each a (Int, (10-character string, Int)) tuple, into an ExternalSorter and then calls writePartitionedFile. The reported memory size is the sum of the shuffle bytes spilled (mem) metric and the remaining size of the collection after insertion has completed. Results are averaged over three runs. Keep in mind that the primary goal here is to reduce GC pressure, so any speed improvements are icing. ||Number of Records||Storing as Serialized||Memory Size||Number of Spills||Insert Time (ms)||Write Time (ms)||Total Time|| |1 million|false|194923217|0|1123|3442|4566| |1 million|true|48694072|0|1315|2652|3967| |10 million|false|2050514159|3|26723|17418|44141| |10 million|true|613614392|1|16501|17151|33652| |50 million|false|10166122563|17|101831|89960|191791| |50 million|true|3067937592|5|76801|78361|155161| was (Author: sandyr): I got a working prototype and benchmarked the ExternalSorter changes on my laptop. Each run inserts a bunch of records, each a (Int, (10-character string, Int)) tuple, into an ExternalSorter and then calls writePartitionedFile. The reported memory size is the sum of the shuffle bytes spilled (mem) metric and the remaining size of the collection after insertion has completed. Results are averaged over three runs. Keep in mind that the primary goal here is to reduce GC pressure, so any speed improvements are icing. ||Number of Records||Storing as Serialized||Memory Size||Number of Spills||Insert Time (ms)||Write Time (ms)||Total Time|| |1 million|false|194923217|0|1123|3442|4566| |1 million|true|48694072|0|1315|2652|3967| |10 million|false|2050514159|3|26723|17418|44141| |10 million|true|613614392|1|16501|17151|33652| |10 million|false|10166122563|17|101831|89960|191791| |10 million|true|3067937592|5|76801|78361|155161| > In sort-based shuffle, store map outputs in serialized form > --- > > Key: SPARK-4550 > URL: https://issues.apache.org/jira/browse/SPARK-4550 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.2.0 >Reporter: Sandy Ryza >Assignee: Sandy Ryza >Priority: Critical > Attachments: SPARK-4550-design-v1.pdf > > > One drawback with sort-based shuffle compared to hash-based shuffle is that > it ends up storing many more java objects in memory. If Spark could store > map outputs in serialized form, it could > * spill less often because the serialized form is more compact > * reduce GC pressure > This will only work when the serialized representations of objects are > independent from each other and occupy contiguous segments of memory. E.g. > when Kryo reference tracking is left on, objects may contain pointers to > objects farther back in the stream, which means that the sort can't relocate > objects without corrupting them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form
[ https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14310130#comment-14310130 ] Sandy Ryza edited comment on SPARK-4550 at 2/6/15 11:13 PM: I got a working prototype and benchmarked the ExternalSorter changes on my laptop. Each run inserts a bunch of records, each a (Int, (10-character string, Int)) tuple, into an ExternalSorter and then calls writePartitionedFile. The reported memory size is the sum of the shuffle bytes spilled (mem) metric and the remaining size of the collection after insertion has completed. Results are averaged over three runs. Keep in mind that the primary goal here is to reduce GC pressure, so any speed improvements are icing. ||Number of records||Storing as Serialized||Memory Size||Number of Spills||Insert Time (ms)||Write Time (ms)||Total Time|| |1 million|false|194923217|0|1123|3442|4566| |1 million|true|48694072|0|1315|2652|3967| |10 million|false|2050514159|3|26723|17418|44141| |10 million|true|613614392|1|16501|17151|33652| |10 million|false|10166122563|17|101831|89960|191791| |10 million|true|3067937592|5|76801|78361|155161| was (Author: sandyr): I got a working prototype and benchmarked the ExternalSorter changes on my laptop. Each run inserts a bunch of records, each a (Int, (10-character string, Int)) tuple, into an ExternalSorter and then calls writePartitionedFile. The reported memory size is the sum of the shuffle bytes spilled (mem) metric and the remaining size of the collection after insertion has completed. Results are averaged over three runs. Keep in mind that the primary goal here is to reduce GC pressure, so any speed improvements are icing. ||Number of records||Storing as Serialized||Memory Size||Number of Spills||Insert Time(ms)||Write Time (ms)||Total Time|| |1 million|false|194923217|0|1123|3442|4566| |1 million|true|48694072|0|1315|2652|3967| |10 million|false|2050514159|3|26723|17418|44141| |10 million|true|613614392|1|16501|17151|33652| |10 million|false|10166122563|17|101831|89960|191791| |10 million|true|3067937592|5|76801|78361|155161| > In sort-based shuffle, store map outputs in serialized form > --- > > Key: SPARK-4550 > URL: https://issues.apache.org/jira/browse/SPARK-4550 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.2.0 >Reporter: Sandy Ryza >Assignee: Sandy Ryza >Priority: Critical > Attachments: SPARK-4550-design-v1.pdf > > > One drawback with sort-based shuffle compared to hash-based shuffle is that > it ends up storing many more java objects in memory. If Spark could store > map outputs in serialized form, it could > * spill less often because the serialized form is more compact > * reduce GC pressure > This will only work when the serialized representations of objects are > independent from each other and occupy contiguous segments of memory. E.g. > when Kryo reference tracking is left on, objects may contain pointers to > objects farther back in the stream, which means that the sort can't relocate > objects without corrupting them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form
[ https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14310130#comment-14310130 ] Sandy Ryza edited comment on SPARK-4550 at 2/6/15 11:13 PM: I got a working prototype and benchmarked the ExternalSorter changes on my laptop. Each run inserts a bunch of records, each a (Int, (10-character string, Int)) tuple, into an ExternalSorter and then calls writePartitionedFile. The reported memory size is the sum of the shuffle bytes spilled (mem) metric and the remaining size of the collection after insertion has completed. Results are averaged over three runs. Keep in mind that the primary goal here is to reduce GC pressure, so any speed improvements are icing. ||Number of Records||Storing as Serialized||Memory Size||Number of Spills||Insert Time (ms)||Write Time (ms)||Total Time|| |1 million|false|194923217|0|1123|3442|4566| |1 million|true|48694072|0|1315|2652|3967| |10 million|false|2050514159|3|26723|17418|44141| |10 million|true|613614392|1|16501|17151|33652| |10 million|false|10166122563|17|101831|89960|191791| |10 million|true|3067937592|5|76801|78361|155161| was (Author: sandyr): I got a working prototype and benchmarked the ExternalSorter changes on my laptop. Each run inserts a bunch of records, each a (Int, (10-character string, Int)) tuple, into an ExternalSorter and then calls writePartitionedFile. The reported memory size is the sum of the shuffle bytes spilled (mem) metric and the remaining size of the collection after insertion has completed. Results are averaged over three runs. Keep in mind that the primary goal here is to reduce GC pressure, so any speed improvements are icing. ||Number of records||Storing as Serialized||Memory Size||Number of Spills||Insert Time (ms)||Write Time (ms)||Total Time|| |1 million|false|194923217|0|1123|3442|4566| |1 million|true|48694072|0|1315|2652|3967| |10 million|false|2050514159|3|26723|17418|44141| |10 million|true|613614392|1|16501|17151|33652| |10 million|false|10166122563|17|101831|89960|191791| |10 million|true|3067937592|5|76801|78361|155161| > In sort-based shuffle, store map outputs in serialized form > --- > > Key: SPARK-4550 > URL: https://issues.apache.org/jira/browse/SPARK-4550 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.2.0 >Reporter: Sandy Ryza >Assignee: Sandy Ryza >Priority: Critical > Attachments: SPARK-4550-design-v1.pdf > > > One drawback with sort-based shuffle compared to hash-based shuffle is that > it ends up storing many more java objects in memory. If Spark could store > map outputs in serialized form, it could > * spill less often because the serialized form is more compact > * reduce GC pressure > This will only work when the serialized representations of objects are > independent from each other and occupy contiguous segments of memory. E.g. > when Kryo reference tracking is left on, objects may contain pointers to > objects farther back in the stream, which means that the sort can't relocate > objects without corrupting them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form
[ https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14310130#comment-14310130 ] Sandy Ryza edited comment on SPARK-4550 at 2/6/15 11:08 PM: I got a working prototype and benchmarked the ExternalSorter changes on my laptop. Each run inserts a bunch of records, each a (Int, (10-character string, Int)) tuple, into an ExternalSorter and then calls writePartitionedFile. The reported memory size is the sum of the shuffle bytes spilled (mem) metric and the remaining size of the collection after insertion has completed. Results are averaged over three runs. Keep in mind that the primary goal here is to reduce GC pressure, so any speed improvements are icing. ||Number of records||Storing as Serialized||Memory Size||Number of Spills||Insert Time(ms)||Write Time (ms)||Total Time|| |1 million|false|194923217|0|1123|3442|4566| |1 million|true|48694072|0|1315|2652|3967| |10 million|false|2050514159|3|26723|17418|44141| |10 million|true|613614392|1|16501|17151|33652| |10 million|false|10166122563|17|101831|89960|191791| |10 million|true|3067937592|5|76801|78361|155161| was (Author: sandyr): I got a working prototype and benchmarked the ExternalSorter changes on my laptop. Each run inserts a bunch of records, each a (Int, (10-character string, Int)) tuple, into an ExternalSorter and then calls writePartitionedFile. The reported memory size is the sum of the shuffle bytes spilled (mem) metric and the remaining size of the collection after insertion has completed. Results are averaged over three runs. Keep in mind that the primary goal here is to reduce GC pressure, so any speed improvements are icing. ||Number of records||Storing as Serialized||Memory Size||Number of Spills||Insert Time(ms)||Write Time (ms)||Total Time|| |1 million|false|194923217|0|1123|3442|4566| |1 million|true|48694072|0|1315|2652|3967| |10 million|true|2050514159|3|26723|17418|44141| |10 million|false|613614392|1|16501|17151|33652| |10 million|true|10166122563|17|101831|89960|191791| |10 million|false|3067937592|5|76801|78361|155161| > In sort-based shuffle, store map outputs in serialized form > --- > > Key: SPARK-4550 > URL: https://issues.apache.org/jira/browse/SPARK-4550 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.2.0 >Reporter: Sandy Ryza >Assignee: Sandy Ryza >Priority: Critical > Attachments: SPARK-4550-design-v1.pdf > > > One drawback with sort-based shuffle compared to hash-based shuffle is that > it ends up storing many more java objects in memory. If Spark could store > map outputs in serialized form, it could > * spill less often because the serialized form is more compact > * reduce GC pressure > This will only work when the serialized representations of objects are > independent from each other and occupy contiguous segments of memory. E.g. > when Kryo reference tracking is left on, objects may contain pointers to > objects farther back in the stream, which means that the sort can't relocate > objects without corrupting them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form
[ https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14304864#comment-14304864 ] Sandy Ryza edited comment on SPARK-4550 at 2/5/15 12:36 AM: I had heard rumors to that effect, so I ran some experiments and didn't find that to be the case: {code} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.SparkConf import java.io.ByteArrayOutputStream import java.nio.ByteBuffer val ser1 = new KryoSerializer(new SparkConf) def serialize(objs: Array[AnyRef], ser: KryoSerializer): Array[Byte] = { val instance = ser.newInstance val baos = new ByteArrayOutputStream() val stream = instance.serializeStream(baos) objs.foreach(obj => stream.writeObject(obj)) stream.close() baos.toByteArray } val inner = (0 until 10).toArray val bytes1 = serialize(Array((1, inner), (2, inner)), ser1) val inner1 = (0 until 10).toArray val inner2 = (0 until 10).toArray val bytes2 = serialize(Array((1, inner1), (2, inner2)), ser1) val secondHalf = new Array[Byte](bytes1.size / 2) System.arraycopy(bytes1, bytes1.size / 2, secondHalf, 0, bytes1.size / 2) ser1.newInstance.deserialize[AnyRef](ByteBuffer.wrap(secondHalf)) {code} A couple observations: * "bytes1" ends up the same size as "bytes2", implying that "inner" is not being reference-tracked between the two writeObject calls * The last line is able to successfully reproduce the second object, implying that there's no information written at the beginning of the stream needed to deserialize objects later down. Are there cases or Kryo versions I'm not thinking about? was (Author: sandyr): I had heard rumors to that effect, so I ran some experiments and didn't find that to be the case: {code} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.SparkConf import java.io.ByteArrayOutputStream import java.nio.ByteBuffer val ser1 = new KryoSerializer(new SparkConf) def serialize(objs: Array[AnyRef], ser: KryoSerializer): Array[Byte] = { val instance = ser.newInstance val baos = new ByteArrayOutputStream() val stream = instance.serializeStream(baos) objs.foreach(obj => stream.writeObject(obj)) stream.close() baos.toByteArray } val inner = (0 until 10).toArray val bytes1 = serialize(Array((1, inner), (2, inner)), ser1) val inner1 = (0 until 10).toArray val inner2 = (0 until 10).toArray val bytes2 = serialize(Array((1, inner1), (2, inner2)), ser1) val secondHalf = new Array[Byte](bytes1.size / 2) System.arraycopy(bytes1, bytes1.size / 2, secondHalf, 0, bytes1.size / 2) ser1.newInstance.deserialize[AnyRef](ByteBuffer.wrap(secondHalf)) {code} bytes1 ends up the same size as bytes2, and the last line is able to successfully reproduce the second object. Are there cases or Kryo versions I'm not thinking about? > In sort-based shuffle, store map outputs in serialized form > --- > > Key: SPARK-4550 > URL: https://issues.apache.org/jira/browse/SPARK-4550 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 1.2.0 >Reporter: Sandy Ryza >Assignee: Sandy Ryza >Priority: Critical > Attachments: SPARK-4550-design-v1.pdf > > > One drawback with sort-based shuffle compared to hash-based shuffle is that > it ends up storing many more java objects in memory. If Spark could store > map outputs in serialized form, it could > * spill less often because the serialized form is more compact > * reduce GC pressure > This will only work when the serialized representations of objects are > independent from each other and occupy contiguous segments of memory. E.g. > when Kryo reference tracking is left on, objects may contain pointers to > objects farther back in the stream, which means that the sort can't relocate > objects without corrupting them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org