Hi Andrew,

Here's a note from the doc for sequenceFile:

    * '''Note:''' Because Hadoop's RecordReader class re-uses the same
Writable object for each
    * record, directly caching the returned RDD will create many references
to the same object.
    * If you plan to directly cache Hadoop writable objects, you should
first copy them using
    * a `map` function.

This should probably say "direct cachingly *or directly shuffling*".  To
sort directly from a sequence file, the records need to be cloned first.

-Sandy


On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson <
andrew.row...@thomsonreuters.com> wrote:

> I've found a strange issue when trying to sort a lot of data in HDFS using
> spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class
> that derives from BytesWritable (the value is also a BytesWritable). I'm
> using a custom KryoSerializer to serialize the underlying byte array
> (basically write the length and the byte array).
>
> My spark job looks like this:
>
> spark.sequenceFile(inputPath, classOf[CustomKey],
> classOf[BytesWritable]).sortByKey().map(t =>
> t._1).saveAsTextFile(outputPath)
>
> CustomKey extends BytesWritable, adds a toString method and some other
> helper methods that extract and convert parts of the underlying byte[].
>
> This should simply output a series of textfiles which contain the sorted
> list of keys. The problem is that under certain circumstances I get many
> duplicate keys. The number of records output is correct, but it appears
> that
> large chunks of the output are simply copies of the last record in that
> chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9].
>
> This appears to happen only above certain input data volumes, and it
> appears
> to be when shuffle spills. For a job where shuffle spill for memory and
> disk
> = 0B, the data is correct. If there is any spill, I see the duplicate
> behaviour. Oddly, the shuffle write is much smaller when there's a spill.
> E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
> whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle
> write.
> I'm guessing some sort of compression is happening on duplicate identical
> values?
>
> Oddly, I can fix this issue if I adjust my scala code to insert a map step
> before the call to sortByKey():
>
> .map(t => (new CustomKey(t._1),t._2))
>
> This constructor is just:
>
> public CustomKey(CustomKey left) { this.set(left); }
>
> Why does this work? I've no idea.
>
> The spark job is running in yarn-client mode with all the default
> configuration values set. Using the external shuffle service and disabling
> spill compression makes no difference.
>
> Is this a bug?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to