[
https://issues.apache.org/jira/browse/SPARK-51756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tengfei Huang updated SPARK-51756:
----------------------------------
Parent: (was: SPARK-51166)
Issue Type: Task (was: Sub-task)
> Non-deterministic stage could lead to incorrect results with partial retry
> --------------------------------------------------------------------------
>
> Key: SPARK-51756
> URL: https://issues.apache.org/jira/browse/SPARK-51756
> Project: Spark
> Issue Type: Task
> Components: Scheduler
> Affects Versions: 4.0.0
> Reporter: Jiexing Li
> Assignee: Jiexing Li
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.1.0
>
> Attachments: example 1.png, example 2.png, example 3.png, example
> 4.png
>
>
> Spark's resilience features can cause an RDD to be partially recomputed, e.g.
> when an executor is lost due to downscaling, or due to a spot instance kill.
> When the output of a nondeterministic task is recomputed, Spark does not
> always recompute everything that depends on this task's output. In some
> cases, some subsequent computations are based on the output of one "attempt"
> of the task, while other subsequent computations are based on another
> "attempt".
> This could be problematic when the producer stage is non-deterministic. In
> which case, the second attempt of the same task can produce output that is
> very different from the first one. For example, if the stage uses a
> round-robin partitioning, some of the output data could be placed in
> different partitions in different task attempts. This could lead to incorrect
> results unless we retry the whole consumer stage that depends on retried
> non-deterministic stage. Below is an example of this.
> *Example:*
> Let's say we have Stage 1 and Stage 2, where Stage 1 is the producer and
> Stage 2 is the consumer. Assume that the data produced by Task 2 were lost
> due to some reason while Stage 2 is executing. Further assume that at this
> point, Task 1 of Stage 2 has already gotten all its inputs and finishes,
> while Task 2 of Stage 2 fails with data fetch failures (See example 1 in
> attachment.)
> Task 2 of Stage 1 will be retried to reproduce the data, and after which Task
> 2 of Stage 2 is retried. Eventually, Task 1 and Task 2 of Stage 2 produces
> the result which contains all 4 tuples \{t1, t2, t3, t4} (See example 2 in
> attachment.).
>
> Now, let's assume that Stage 1 is non-deterministic (e.g., when using
> round-robin partitioning and the input data is not ordering), and Task 2
> places tuple t3 for Partition 1 and tuple t4 for Partition 2 in its first
> attempt. It places tuple t4 for Partition 1 and tuple t3 for Partition 2 in
> its second attempt. When Task 2 of Stage 2 is retried, instead of reading
> \{t2, t4} as it should, it reads \{t2, t3} as its input. The result generated
> by Stage 2 is \{t1, t2, t3, t3}, which is inaccurate. (See example 3 in
> attachment.)
>
> The problem can be avoided if we retry all tasks of Stage 2. As all tasks
> read consistent data, we can produce result correctly, regardless of how the
> retried of Stage 1 Task 2 would partition the data. (See example 4 in
> attachment.)
> *Proposal:*
> To avoid correctness issues produce by non-deterministic stage with partial
> retry, we propose an approach which first try to detect inconsistent data
> that might be generated by different task attempts of a non-deterministic
> stage. For example, whether all the data partitions generated by Task 2 in
> the first attempt are the same as the all the data partitions generated by
> the second attempt. We retry the entire consumer stages if inconsistent data
> is detected.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]