Filed https://issues.apache.org/jira/browse/SPARK-5500 for this.
-Sandy On Fri, Jan 30, 2015 at 11:59 AM, Aaron Davidson <ilike...@gmail.com> wrote: > Ah, this is in particular an issue due to sort-based shuffle (it was not > the case for hash-based shuffle, which would immediately serialize each > record rather than holding many in memory at once). The documentation > should be updated. > > On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza <sandy.r...@cloudera.com> > wrote: > >> Hi Andrew, >> >> Here's a note from the doc for sequenceFile: >> >> * '''Note:''' Because Hadoop's RecordReader class re-uses the same >> Writable object for each >> * record, directly caching the returned RDD will create many >> references to the same object. >> * If you plan to directly cache Hadoop writable objects, you should >> first copy them using >> * a `map` function. >> >> This should probably say "direct cachingly *or directly shuffling*". To >> sort directly from a sequence file, the records need to be cloned first. >> >> -Sandy >> >> >> On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson < >> andrew.row...@thomsonreuters.com> wrote: >> >>> I've found a strange issue when trying to sort a lot of data in HDFS >>> using >>> spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a >>> class >>> that derives from BytesWritable (the value is also a BytesWritable). I'm >>> using a custom KryoSerializer to serialize the underlying byte array >>> (basically write the length and the byte array). >>> >>> My spark job looks like this: >>> >>> spark.sequenceFile(inputPath, classOf[CustomKey], >>> classOf[BytesWritable]).sortByKey().map(t => >>> t._1).saveAsTextFile(outputPath) >>> >>> CustomKey extends BytesWritable, adds a toString method and some other >>> helper methods that extract and convert parts of the underlying byte[]. >>> >>> This should simply output a series of textfiles which contain the sorted >>> list of keys. The problem is that under certain circumstances I get many >>> duplicate keys. The number of records output is correct, but it appears >>> that >>> large chunks of the output are simply copies of the last record in that >>> chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9]. >>> >>> This appears to happen only above certain input data volumes, and it >>> appears >>> to be when shuffle spills. For a job where shuffle spill for memory and >>> disk >>> = 0B, the data is correct. If there is any spill, I see the duplicate >>> behaviour. Oddly, the shuffle write is much smaller when there's a spill. >>> E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write, >>> whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle >>> write. >>> I'm guessing some sort of compression is happening on duplicate identical >>> values? >>> >>> Oddly, I can fix this issue if I adjust my scala code to insert a map >>> step >>> before the call to sortByKey(): >>> >>> .map(t => (new CustomKey(t._1),t._2)) >>> >>> This constructor is just: >>> >>> public CustomKey(CustomKey left) { this.set(left); } >>> >>> Why does this work? I've no idea. >>> >>> The spark job is running in yarn-client mode with all the default >>> configuration values set. Using the external shuffle service and >>> disabling >>> spill compression makes no difference. >>> >>> Is this a bug? >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >