Yongqin Xiao created SPARK-22828:
------------------------------------

             Summary: Data corruption happens when same RDD being repeatedly 
used as parent RDD of a custom RDD which reads each parent RDD in concurrent 
threads
                 Key: SPARK-22828
                 URL: https://issues.apache.org/jira/browse/SPARK-22828
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.1.0
            Reporter: Yongqin Xiao


I have defined a custom RDD that 
- computes the output based on input data using our traditional data 
transformation code. To give an extreme example, this custom RDD can behave as 
a union, joiner etc. 
- takes one or more parent RDDs as input, where some or all parent RDDs can be 
the same
- reads input parent RDDs in concurrent threads (i.e. reader threads)
- computes the data in one or more transformation thread that concurrently 
running as the reader threads
- ...

In certain cases, we see{color:red} data being corrupted{color} when our reader 
threads read them in. The corruption happens when all of the following 
conditions are met:
- Multiple parent RDDs of the custom RDD are actually the same RDD. e.g. 
same-source union.

{code:java}
The scala code is kind of like this:
Rdd rdd1 = ...
Rdd customRdd = MyRdd(rdd1, rdd1, ...)
{code}

- The parent RDD is not a result of repartitioning or sorting-within-partition.
- There is no persistence on the same parent RDD.
- spark.sql.shuffle.partitions is set to 1. We saw corruption as well when the 
value is set to small value like 2, which is also the source partition count.

This data corruption happens even when number of executors and cores are set to 
1. Meaning this corruption is not related to multiple partitions running 
concurrently.


Data corruption doesn't happen when either of the condition is met:
1. Instead of setting the same parent RDD as multiple input to my custom RDD, 
we do a select (of all columns) operation on that parent RDD, and use different 
select RDD as input.

{code:java}
 The scala code is like this:
Rdd rdd1 = ...
Rdd customRdd = MyRdd(rdd1.select($1,$2,...), rdd1.select($1, $2), ...)
{code}

2. we persist the parent RDD
Rdd rdd1 = ...
rdd1.persist(...)
Rdd customRdd = MyRdd(rdd1, rdd1, ...)

3. we use single thread to read parent RDD in custom RDD implementation
4. Use our default value (100) for spark.sql.shuffle.partitions 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to