Filed https://issues.apache.org/jira/browse/SPARK-5500 for this.

-Sandy

On Fri, Jan 30, 2015 at 11:59 AM, Aaron Davidson <ilike...@gmail.com> wrote:

> Ah, this is in particular an issue due to sort-based shuffle (it was not
> the case for hash-based shuffle, which would immediately serialize each
> record rather than holding many in memory at once). The documentation
> should be updated.
>
> On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza <sandy.r...@cloudera.com>
> wrote:
>
>> Hi Andrew,
>>
>> Here's a note from the doc for sequenceFile:
>>
>>     * '''Note:''' Because Hadoop's RecordReader class re-uses the same
>> Writable object for each
>>     * record, directly caching the returned RDD will create many
>> references to the same object.
>>     * If you plan to directly cache Hadoop writable objects, you should
>> first copy them using
>>     * a `map` function.
>>
>> This should probably say "direct cachingly *or directly shuffling*".  To
>> sort directly from a sequence file, the records need to be cloned first.
>>
>> -Sandy
>>
>>
>> On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson <
>> andrew.row...@thomsonreuters.com> wrote:
>>
>>> I've found a strange issue when trying to sort a lot of data in HDFS
>>> using
>>> spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a
>>> class
>>> that derives from BytesWritable (the value is also a BytesWritable). I'm
>>> using a custom KryoSerializer to serialize the underlying byte array
>>> (basically write the length and the byte array).
>>>
>>> My spark job looks like this:
>>>
>>> spark.sequenceFile(inputPath, classOf[CustomKey],
>>> classOf[BytesWritable]).sortByKey().map(t =>
>>> t._1).saveAsTextFile(outputPath)
>>>
>>> CustomKey extends BytesWritable, adds a toString method and some other
>>> helper methods that extract and convert parts of the underlying byte[].
>>>
>>> This should simply output a series of textfiles which contain the sorted
>>> list of keys. The problem is that under certain circumstances I get many
>>> duplicate keys. The number of records output is correct, but it appears
>>> that
>>> large chunks of the output are simply copies of the last record in that
>>> chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9].
>>>
>>> This appears to happen only above certain input data volumes, and it
>>> appears
>>> to be when shuffle spills. For a job where shuffle spill for memory and
>>> disk
>>> = 0B, the data is correct. If there is any spill, I see the duplicate
>>> behaviour. Oddly, the shuffle write is much smaller when there's a spill.
>>> E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
>>> whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle
>>> write.
>>> I'm guessing some sort of compression is happening on duplicate identical
>>> values?
>>>
>>> Oddly, I can fix this issue if I adjust my scala code to insert a map
>>> step
>>> before the call to sortByKey():
>>>
>>> .map(t => (new CustomKey(t._1),t._2))
>>>
>>> This constructor is just:
>>>
>>> public CustomKey(CustomKey left) { this.set(left); }
>>>
>>> Why does this work? I've no idea.
>>>
>>> The spark job is running in yarn-client mode with all the default
>>> configuration values set. Using the external shuffle service and
>>> disabling
>>> spill compression makes no difference.
>>>
>>> Is this a bug?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

Reply via email to