Hi Andrew, Here's a note from the doc for sequenceFile:
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD will create many references to the same object. * If you plan to directly cache Hadoop writable objects, you should first copy them using * a `map` function. This should probably say "direct cachingly *or directly shuffling*". To sort directly from a sequence file, the records need to be cloned first. -Sandy On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson < andrew.row...@thomsonreuters.com> wrote: > I've found a strange issue when trying to sort a lot of data in HDFS using > spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class > that derives from BytesWritable (the value is also a BytesWritable). I'm > using a custom KryoSerializer to serialize the underlying byte array > (basically write the length and the byte array). > > My spark job looks like this: > > spark.sequenceFile(inputPath, classOf[CustomKey], > classOf[BytesWritable]).sortByKey().map(t => > t._1).saveAsTextFile(outputPath) > > CustomKey extends BytesWritable, adds a toString method and some other > helper methods that extract and convert parts of the underlying byte[]. > > This should simply output a series of textfiles which contain the sorted > list of keys. The problem is that under certain circumstances I get many > duplicate keys. The number of records output is correct, but it appears > that > large chunks of the output are simply copies of the last record in that > chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9]. > > This appears to happen only above certain input data volumes, and it > appears > to be when shuffle spills. For a job where shuffle spill for memory and > disk > = 0B, the data is correct. If there is any spill, I see the duplicate > behaviour. Oddly, the shuffle write is much smaller when there's a spill. > E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write, > whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle > write. > I'm guessing some sort of compression is happening on duplicate identical > values? > > Oddly, I can fix this issue if I adjust my scala code to insert a map step > before the call to sortByKey(): > > .map(t => (new CustomKey(t._1),t._2)) > > This constructor is just: > > public CustomKey(CustomKey left) { this.set(left); } > > Why does this work? I've no idea. > > The spark job is running in yarn-client mode with all the default > configuration values set. Using the external shuffle service and disabling > spill compression makes no difference. > > Is this a bug? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >