Re: Duplicate key when sorting BytesWritable with Kryo?

2015-01-30 Thread Aaron Davidson
Ah, this is in particular an issue due to sort-based shuffle (it was not
the case for hash-based shuffle, which would immediately serialize each
record rather than holding many in memory at once). The documentation
should be updated.

On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Hi Andrew,

 Here's a note from the doc for sequenceFile:

 * '''Note:''' Because Hadoop's RecordReader class re-uses the same
 Writable object for each
 * record, directly caching the returned RDD will create many
 references to the same object.
 * If you plan to directly cache Hadoop writable objects, you should
 first copy them using
 * a `map` function.

 This should probably say direct cachingly *or directly shuffling*.  To
 sort directly from a sequence file, the records need to be cloned first.

 -Sandy


 On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson 
 andrew.row...@thomsonreuters.com wrote:

 I've found a strange issue when trying to sort a lot of data in HDFS using
 spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class
 that derives from BytesWritable (the value is also a BytesWritable). I'm
 using a custom KryoSerializer to serialize the underlying byte array
 (basically write the length and the byte array).

 My spark job looks like this:

 spark.sequenceFile(inputPath, classOf[CustomKey],
 classOf[BytesWritable]).sortByKey().map(t =
 t._1).saveAsTextFile(outputPath)

 CustomKey extends BytesWritable, adds a toString method and some other
 helper methods that extract and convert parts of the underlying byte[].

 This should simply output a series of textfiles which contain the sorted
 list of keys. The problem is that under certain circumstances I get many
 duplicate keys. The number of records output is correct, but it appears
 that
 large chunks of the output are simply copies of the last record in that
 chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9].

 This appears to happen only above certain input data volumes, and it
 appears
 to be when shuffle spills. For a job where shuffle spill for memory and
 disk
 = 0B, the data is correct. If there is any spill, I see the duplicate
 behaviour. Oddly, the shuffle write is much smaller when there's a spill.
 E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
 whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle
 write.
 I'm guessing some sort of compression is happening on duplicate identical
 values?

 Oddly, I can fix this issue if I adjust my scala code to insert a map step
 before the call to sortByKey():

 .map(t = (new CustomKey(t._1),t._2))

 This constructor is just:

 public CustomKey(CustomKey left) { this.set(left); }

 Why does this work? I've no idea.

 The spark job is running in yarn-client mode with all the default
 configuration values set. Using the external shuffle service and disabling
 spill compression makes no difference.

 Is this a bug?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Duplicate key when sorting BytesWritable with Kryo?

2015-01-30 Thread Sandy Ryza
Hi Andrew,

Here's a note from the doc for sequenceFile:

* '''Note:''' Because Hadoop's RecordReader class re-uses the same
Writable object for each
* record, directly caching the returned RDD will create many references
to the same object.
* If you plan to directly cache Hadoop writable objects, you should
first copy them using
* a `map` function.

This should probably say direct cachingly *or directly shuffling*.  To
sort directly from a sequence file, the records need to be cloned first.

-Sandy


On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson 
andrew.row...@thomsonreuters.com wrote:

 I've found a strange issue when trying to sort a lot of data in HDFS using
 spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class
 that derives from BytesWritable (the value is also a BytesWritable). I'm
 using a custom KryoSerializer to serialize the underlying byte array
 (basically write the length and the byte array).

 My spark job looks like this:

 spark.sequenceFile(inputPath, classOf[CustomKey],
 classOf[BytesWritable]).sortByKey().map(t =
 t._1).saveAsTextFile(outputPath)

 CustomKey extends BytesWritable, adds a toString method and some other
 helper methods that extract and convert parts of the underlying byte[].

 This should simply output a series of textfiles which contain the sorted
 list of keys. The problem is that under certain circumstances I get many
 duplicate keys. The number of records output is correct, but it appears
 that
 large chunks of the output are simply copies of the last record in that
 chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9].

 This appears to happen only above certain input data volumes, and it
 appears
 to be when shuffle spills. For a job where shuffle spill for memory and
 disk
 = 0B, the data is correct. If there is any spill, I see the duplicate
 behaviour. Oddly, the shuffle write is much smaller when there's a spill.
 E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
 whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle
 write.
 I'm guessing some sort of compression is happening on duplicate identical
 values?

 Oddly, I can fix this issue if I adjust my scala code to insert a map step
 before the call to sortByKey():

 .map(t = (new CustomKey(t._1),t._2))

 This constructor is just:

 public CustomKey(CustomKey left) { this.set(left); }

 Why does this work? I've no idea.

 The spark job is running in yarn-client mode with all the default
 configuration values set. Using the external shuffle service and disabling
 spill compression makes no difference.

 Is this a bug?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Duplicate key when sorting BytesWritable with Kryo?

2015-01-30 Thread Sandy Ryza
Filed https://issues.apache.org/jira/browse/SPARK-5500 for this.

-Sandy

On Fri, Jan 30, 2015 at 11:59 AM, Aaron Davidson ilike...@gmail.com wrote:

 Ah, this is in particular an issue due to sort-based shuffle (it was not
 the case for hash-based shuffle, which would immediately serialize each
 record rather than holding many in memory at once). The documentation
 should be updated.

 On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Andrew,

 Here's a note from the doc for sequenceFile:

 * '''Note:''' Because Hadoop's RecordReader class re-uses the same
 Writable object for each
 * record, directly caching the returned RDD will create many
 references to the same object.
 * If you plan to directly cache Hadoop writable objects, you should
 first copy them using
 * a `map` function.

 This should probably say direct cachingly *or directly shuffling*.  To
 sort directly from a sequence file, the records need to be cloned first.

 -Sandy


 On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson 
 andrew.row...@thomsonreuters.com wrote:

 I've found a strange issue when trying to sort a lot of data in HDFS
 using
 spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a
 class
 that derives from BytesWritable (the value is also a BytesWritable). I'm
 using a custom KryoSerializer to serialize the underlying byte array
 (basically write the length and the byte array).

 My spark job looks like this:

 spark.sequenceFile(inputPath, classOf[CustomKey],
 classOf[BytesWritable]).sortByKey().map(t =
 t._1).saveAsTextFile(outputPath)

 CustomKey extends BytesWritable, adds a toString method and some other
 helper methods that extract and convert parts of the underlying byte[].

 This should simply output a series of textfiles which contain the sorted
 list of keys. The problem is that under certain circumstances I get many
 duplicate keys. The number of records output is correct, but it appears
 that
 large chunks of the output are simply copies of the last record in that
 chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9].

 This appears to happen only above certain input data volumes, and it
 appears
 to be when shuffle spills. For a job where shuffle spill for memory and
 disk
 = 0B, the data is correct. If there is any spill, I see the duplicate
 behaviour. Oddly, the shuffle write is much smaller when there's a spill.
 E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
 whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle
 write.
 I'm guessing some sort of compression is happening on duplicate identical
 values?

 Oddly, I can fix this issue if I adjust my scala code to insert a map
 step
 before the call to sortByKey():

 .map(t = (new CustomKey(t._1),t._2))

 This constructor is just:

 public CustomKey(CustomKey left) { this.set(left); }

 Why does this work? I've no idea.

 The spark job is running in yarn-client mode with all the default
 configuration values set. Using the external shuffle service and
 disabling
 spill compression makes no difference.

 Is this a bug?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Duplicate key when sorting BytesWritable with Kryo?

2015-01-30 Thread andrew.rowson
I've found a strange issue when trying to sort a lot of data in HDFS using
spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class
that derives from BytesWritable (the value is also a BytesWritable). I'm
using a custom KryoSerializer to serialize the underlying byte array
(basically write the length and the byte array).

My spark job looks like this:

spark.sequenceFile(inputPath, classOf[CustomKey],
classOf[BytesWritable]).sortByKey().map(t =
t._1).saveAsTextFile(outputPath)

CustomKey extends BytesWritable, adds a toString method and some other
helper methods that extract and convert parts of the underlying byte[].

This should simply output a series of textfiles which contain the sorted
list of keys. The problem is that under certain circumstances I get many
duplicate keys. The number of records output is correct, but it appears that
large chunks of the output are simply copies of the last record in that
chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9]. 

This appears to happen only above certain input data volumes, and it appears
to be when shuffle spills. For a job where shuffle spill for memory and disk
= 0B, the data is correct. If there is any spill, I see the duplicate
behaviour. Oddly, the shuffle write is much smaller when there's a spill.
E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle write.
I'm guessing some sort of compression is happening on duplicate identical
values?

Oddly, I can fix this issue if I adjust my scala code to insert a map step
before the call to sortByKey():

.map(t = (new CustomKey(t._1),t._2))

This constructor is just:

public CustomKey(CustomKey left) { this.set(left); }

Why does this work? I've no idea.

The spark job is running in yarn-client mode with all the default
configuration values set. Using the external shuffle service and disabling
spill compression makes no difference.

Is this a bug?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org