Re: Duplicate key when sorting BytesWritable with Kryo?
Ah, this is in particular an issue due to sort-based shuffle (it was not the case for hash-based shuffle, which would immediately serialize each record rather than holding many in memory at once). The documentation should be updated. On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Andrew, Here's a note from the doc for sequenceFile: * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD will create many references to the same object. * If you plan to directly cache Hadoop writable objects, you should first copy them using * a `map` function. This should probably say direct cachingly *or directly shuffling*. To sort directly from a sequence file, the records need to be cloned first. -Sandy On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson andrew.row...@thomsonreuters.com wrote: I've found a strange issue when trying to sort a lot of data in HDFS using spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class that derives from BytesWritable (the value is also a BytesWritable). I'm using a custom KryoSerializer to serialize the underlying byte array (basically write the length and the byte array). My spark job looks like this: spark.sequenceFile(inputPath, classOf[CustomKey], classOf[BytesWritable]).sortByKey().map(t = t._1).saveAsTextFile(outputPath) CustomKey extends BytesWritable, adds a toString method and some other helper methods that extract and convert parts of the underlying byte[]. This should simply output a series of textfiles which contain the sorted list of keys. The problem is that under certain circumstances I get many duplicate keys. The number of records output is correct, but it appears that large chunks of the output are simply copies of the last record in that chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9]. This appears to happen only above certain input data volumes, and it appears to be when shuffle spills. For a job where shuffle spill for memory and disk = 0B, the data is correct. If there is any spill, I see the duplicate behaviour. Oddly, the shuffle write is much smaller when there's a spill. E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write, whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle write. I'm guessing some sort of compression is happening on duplicate identical values? Oddly, I can fix this issue if I adjust my scala code to insert a map step before the call to sortByKey(): .map(t = (new CustomKey(t._1),t._2)) This constructor is just: public CustomKey(CustomKey left) { this.set(left); } Why does this work? I've no idea. The spark job is running in yarn-client mode with all the default configuration values set. Using the external shuffle service and disabling spill compression makes no difference. Is this a bug? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Duplicate key when sorting BytesWritable with Kryo?
Hi Andrew, Here's a note from the doc for sequenceFile: * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD will create many references to the same object. * If you plan to directly cache Hadoop writable objects, you should first copy them using * a `map` function. This should probably say direct cachingly *or directly shuffling*. To sort directly from a sequence file, the records need to be cloned first. -Sandy On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson andrew.row...@thomsonreuters.com wrote: I've found a strange issue when trying to sort a lot of data in HDFS using spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class that derives from BytesWritable (the value is also a BytesWritable). I'm using a custom KryoSerializer to serialize the underlying byte array (basically write the length and the byte array). My spark job looks like this: spark.sequenceFile(inputPath, classOf[CustomKey], classOf[BytesWritable]).sortByKey().map(t = t._1).saveAsTextFile(outputPath) CustomKey extends BytesWritable, adds a toString method and some other helper methods that extract and convert parts of the underlying byte[]. This should simply output a series of textfiles which contain the sorted list of keys. The problem is that under certain circumstances I get many duplicate keys. The number of records output is correct, but it appears that large chunks of the output are simply copies of the last record in that chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9]. This appears to happen only above certain input data volumes, and it appears to be when shuffle spills. For a job where shuffle spill for memory and disk = 0B, the data is correct. If there is any spill, I see the duplicate behaviour. Oddly, the shuffle write is much smaller when there's a spill. E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write, whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle write. I'm guessing some sort of compression is happening on duplicate identical values? Oddly, I can fix this issue if I adjust my scala code to insert a map step before the call to sortByKey(): .map(t = (new CustomKey(t._1),t._2)) This constructor is just: public CustomKey(CustomKey left) { this.set(left); } Why does this work? I've no idea. The spark job is running in yarn-client mode with all the default configuration values set. Using the external shuffle service and disabling spill compression makes no difference. Is this a bug? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Duplicate key when sorting BytesWritable with Kryo?
Filed https://issues.apache.org/jira/browse/SPARK-5500 for this. -Sandy On Fri, Jan 30, 2015 at 11:59 AM, Aaron Davidson ilike...@gmail.com wrote: Ah, this is in particular an issue due to sort-based shuffle (it was not the case for hash-based shuffle, which would immediately serialize each record rather than holding many in memory at once). The documentation should be updated. On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Andrew, Here's a note from the doc for sequenceFile: * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD will create many references to the same object. * If you plan to directly cache Hadoop writable objects, you should first copy them using * a `map` function. This should probably say direct cachingly *or directly shuffling*. To sort directly from a sequence file, the records need to be cloned first. -Sandy On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson andrew.row...@thomsonreuters.com wrote: I've found a strange issue when trying to sort a lot of data in HDFS using spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class that derives from BytesWritable (the value is also a BytesWritable). I'm using a custom KryoSerializer to serialize the underlying byte array (basically write the length and the byte array). My spark job looks like this: spark.sequenceFile(inputPath, classOf[CustomKey], classOf[BytesWritable]).sortByKey().map(t = t._1).saveAsTextFile(outputPath) CustomKey extends BytesWritable, adds a toString method and some other helper methods that extract and convert parts of the underlying byte[]. This should simply output a series of textfiles which contain the sorted list of keys. The problem is that under certain circumstances I get many duplicate keys. The number of records output is correct, but it appears that large chunks of the output are simply copies of the last record in that chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9]. This appears to happen only above certain input data volumes, and it appears to be when shuffle spills. For a job where shuffle spill for memory and disk = 0B, the data is correct. If there is any spill, I see the duplicate behaviour. Oddly, the shuffle write is much smaller when there's a spill. E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write, whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle write. I'm guessing some sort of compression is happening on duplicate identical values? Oddly, I can fix this issue if I adjust my scala code to insert a map step before the call to sortByKey(): .map(t = (new CustomKey(t._1),t._2)) This constructor is just: public CustomKey(CustomKey left) { this.set(left); } Why does this work? I've no idea. The spark job is running in yarn-client mode with all the default configuration values set. Using the external shuffle service and disabling spill compression makes no difference. Is this a bug? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Duplicate key when sorting BytesWritable with Kryo?
I've found a strange issue when trying to sort a lot of data in HDFS using spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class that derives from BytesWritable (the value is also a BytesWritable). I'm using a custom KryoSerializer to serialize the underlying byte array (basically write the length and the byte array). My spark job looks like this: spark.sequenceFile(inputPath, classOf[CustomKey], classOf[BytesWritable]).sortByKey().map(t = t._1).saveAsTextFile(outputPath) CustomKey extends BytesWritable, adds a toString method and some other helper methods that extract and convert parts of the underlying byte[]. This should simply output a series of textfiles which contain the sorted list of keys. The problem is that under certain circumstances I get many duplicate keys. The number of records output is correct, but it appears that large chunks of the output are simply copies of the last record in that chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9]. This appears to happen only above certain input data volumes, and it appears to be when shuffle spills. For a job where shuffle spill for memory and disk = 0B, the data is correct. If there is any spill, I see the duplicate behaviour. Oddly, the shuffle write is much smaller when there's a spill. E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write, whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle write. I'm guessing some sort of compression is happening on duplicate identical values? Oddly, I can fix this issue if I adjust my scala code to insert a map step before the call to sortByKey(): .map(t = (new CustomKey(t._1),t._2)) This constructor is just: public CustomKey(CustomKey left) { this.set(left); } Why does this work? I've no idea. The spark job is running in yarn-client mode with all the default configuration values set. Using the external shuffle service and disabling spill compression makes no difference. Is this a bug? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org