Based on Ben's helpful error description, I managed to reproduce this bug
and found the root cause:

There's a bug in MemoryStore's PartiallySerializedBlock class: it doesn't
close a serialization stream before attempting to deserialize its
serialized values, causing it to miss any data stored in the serializer's
internal buffers (which can happen with KryoSerializer, which was
automatically being used to serialize RDDs of byte arrays). I've reported
this as https://issues.apache.org/jira/browse/SPARK-17491 and have submitted
 https://github.com/apache/spark/pull/15043 to fix this (I'm still planning
to add more tests to that patch).

On Fri, Sep 9, 2016 at 10:37 AM Josh Rosen <joshro...@databricks.com> wrote:

> cache() / persist() is definitely *not* supposed to affect the result of
> a program, so the behavior that you're seeing is unexpected.
>
> I'll try to reproduce this myself by caching in PySpark under heavy memory
> pressure, but in the meantime the following questions will help me to debug:
>
>    - Does this only happen in Spark 2.0? Have you successfully run the
>    same workload with correct behavior on an earlier Spark version, such as
>    1.6.x?
>    - How accurately does your example code model the structure of your
>    real code? Are you calling cache()/persist() on an RDD which has been
>    transformed in Python or are you calling it on an untransformed input RDD
>    (such as the RDD returned from sc.textFile() / sc.hadoopFile())?
>
>
> On Fri, Sep 9, 2016 at 5:01 AM Ben Leslie <be...@benno.id.au> wrote:
>
>> Hi,
>>
>> I'm trying to understand if there is any difference in correctness
>> between rdd.persist(pyspark.StorageLevel.MEMORY_ONLY) and
>> rdd.persist(pyspark.StorageLevel.MEMORY_AND_DISK).
>>
>> I can see that there may be differences in performance, but my
>> expectation was that using either would result in the same behaviour.
>> However that is not what I'm seeing in practise.
>>
>> Specifically I have some code like:
>>
>>     text_lines = sc.textFile(input_files)
>>     records = text_lines.map(json.loads)
>>     records.persist(pyspark.StorageLevel.MEMORY_ONLY)
>>     count = records.count()
>>     records.unpersist()
>>
>> When I do not use persist at all the 'count' variable contains the
>> correct value.
>> When I use persist with pyspark.StorageLevel.MEMORY_AND_DISK, I also
>> get the correct, expected value.
>> However, if I use persist with no argument (or
>> pyspark.StorageLevel.MEMORY_ONLY) then the value of 'count' is too
>> small.
>>
>> In all cases the script completes without errors (or warning as far as
>> I can tell).
>>
>> I'm using Spark 2.0.0 on an AWS EMR cluster.
>>
>> It appears that the executors may not have enough memory to store all
>> the RDD partitions in memory only, however I thought in this case it
>> would fall back to regenerating from the parent RDD, rather than
>> providing the wrong answer.
>>
>> Is this the expected behaviour? It seems a little difficult to work
>> with in practise.
>>
>> Cheers,
>>
>> Ben
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

Reply via email to