[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500997#comment-17500997
 ] 

Xingbo Jiang commented on SPARK-38388:
--------------------------------------

Have you tried override the `getOutputDeterministicLevel` function in your 
customized RDD that generated the upstream data? If the data could be 
non-deterministic, you can return DeterministicLevel.INDETERMINATE, thus every 
time a children stage need to retry, it restarts tasks for all the partitions 
(not just the partitions that are lost/failed).

> Repartition + Stage retries could lead to incorrect data 
> ---------------------------------------------------------
>
>                 Key: SPARK-38388
>                 URL: https://issues.apache.org/jira/browse/SPARK-38388
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0, 3.1.1
>         Environment: Spark 2.4 and 3.1
>            Reporter: Jason Xu
>            Priority: Major
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record binaries. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 1000000:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
>     throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to