[ 
https://issues.apache.org/jira/browse/SPARK-22828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yongqin Xiao updated SPARK-22828:
---------------------------------
    Shepherd: Jiang Xingbo

> Data corruption happens when same RDD being repeatedly used as parent RDD of 
> a custom RDD which reads each parent RDD in concurrent threads
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22828
>                 URL: https://issues.apache.org/jira/browse/SPARK-22828
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Yongqin Xiao
>              Labels: spark
>
> I have defined a custom RDD that 
> - computes the output based on input data using our traditional data 
> transformation code. To give an extreme example, this custom RDD can behave 
> as a union, joiner etc. 
> - takes one or more parent RDDs as input, where some or all parent RDDs can 
> be the same
> - reads input parent RDDs in concurrent threads (i.e. reader threads)
> - computes the data in one or more transformation thread that concurrently 
> running as the reader threads
> - ...
> In certain cases, we see{color:red} data being corrupted{color} when our 
> reader threads read them in. The corruption happens when all of the following 
> conditions are met:
> - Multiple parent RDDs of the custom RDD are actually the same RDD. e.g. 
> same-source union.
> {code:java}
> The scala code is kind of like this:
> Rdd rdd1 = ...
> Rdd customRdd = new MyRdd(rdd1, rdd1, ...)
> {code}
> - The parent RDD is not a result of repartitioning or 
> sorting-within-partition.
> - There is no persistence on the same parent RDD.
> - spark.sql.shuffle.partitions is set to 1. We saw corruption as well when 
> the value is set to small value like 2, which is also the source partition 
> count.
> This data corruption happens even when number of executors and cores are set 
> to 1. Meaning this corruption is not related to multiple partitions running 
> concurrently.
> Data corruption doesn't happen when either of the condition is met:
> 1. Instead of setting the same parent RDD as multiple input to my custom RDD, 
> we do a select (of all columns) operation on that parent RDD, and use 
> different select RDD as input.
> {code:java}
>  The scala code is like this:
> Rdd rdd1 = ...
> Rdd customRdd = new MyRdd(rdd1.select($1,$2,...), rdd1.select($1, $2), ...)
> {code}
> 2. we persist the parent RDD
> {code:java}
> Rdd rdd1 = ...
> rdd1.persist(...)
> Rdd customRdd = MyRdd(rdd1, rdd1, ...)
> {code}
> 3. we use single thread to read parent RDD in custom RDD implementation
> 4. Use our default value (100) for spark.sql.shuffle.partitions 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to