GitHub user bersprockets opened a pull request:

    https://github.com/apache/spark/pull/22382

    [SPARK-23243] [SPARK-20715][CORE][2.2] Fix RDD.repartition() data 
correctness issue

    ## What changes were proposed in this pull request?
    
    Back port of #22354 and #17955 to 2.2 (#22354 depends on methods introduced 
by #17955).
    
    -------
    
    An alternative fix for #21698
    
    When Spark rerun tasks for an RDD, there are 3 different behaviors:
    1. determinate. Always return the same result with same order when rerun.
    2. unordered. Returns same data set in random order when rerun.
    3. indeterminate. Returns different result when rerun.
    
    Normally Spark doesn't need to care about it. Spark runs stages one by one, 
when a task is failed, just rerun it. Although the rerun task may return a 
different result, users will not be surprised.
    
    However, Spark may rerun a finished stage when seeing fetch failures. When 
this happens, Spark needs to rerun all the tasks of all the succeeding stages 
if the RDD output is indeterminate, because the input of the succeeding stages 
has been changed.
    
    If the RDD output is determinate, we only need to rerun the failed tasks of 
the succeeding stages, because the input doesn't change.
    
    If the RDD output is unordered, it's same as determinate, because shuffle 
partitioner is always deterministic(round-robin partitioner is not a shuffle 
partitioner that extends `org.apache.spark.Partitioner`), so the reducers will 
still get the same input data set.
    
    This PR fixed the failure handling for `repartition`, to avoid correctness 
issues.
    
    For `repartition`, it applies a stateful map function to generate a 
round-robin id, which is order sensitive and makes the RDD's output 
indeterminate. When the stage contains `repartition` reruns, we must also rerun 
all the tasks of all the succeeding stages.
    
    **future improvement:**
    1. Currently we can't rollback and rerun a shuffle map stage, and just 
fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
    2. Currently we can't rollback and rerun a result stage, and just fail. We 
should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
    3. We should provide public API to allow users to tag the random level of 
the RDD's computing function.
    
    ## How was this patch tested?
    
    a new test case


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bersprockets/spark SPARK-23243-2.2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22382.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22382
    
----
commit 97ba5a71e1903e0462bfac3201f1961e0c14f384
Author: Wenchen Fan <wenchen@...>
Date:   2018-09-07T02:52:45Z

    [SPARK-23243][CORE][2.2] Fix RDD.repartition() data correctness issue
    
    backport https://github.com/apache/spark/pull/22112 to 2.2
    
    -------
    
    An alternative fix for https://github.com/apache/spark/pull/21698
    
    When Spark rerun tasks for an RDD, there are 3 different behaviors:
    1. determinate. Always return the same result with same order when rerun.
    2. unordered. Returns same data set in random order when rerun.
    3. indeterminate. Returns different result when rerun.
    
    Normally Spark doesn't need to care about it. Spark runs stages one by one, 
when a task is failed, just rerun it. Although the rerun task may return a 
different result, users will not be surprised.
    
    However, Spark may rerun a finished stage when seeing fetch failures. When 
this happens, Spark needs to rerun all the tasks of all the succeeding stages 
if the RDD output is indeterminate, because the input of the succeeding stages 
has been changed.
    
    If the RDD output is determinate, we only need to rerun the failed tasks of 
the succeeding stages, because the input doesn't change.
    
    If the RDD output is unordered, it's same as determinate, because shuffle 
partitioner is always deterministic(round-robin partitioner is not a shuffle 
partitioner that extends `org.apache.spark.Partitioner`), so the reducers will 
still get the same input data set.
    
    This PR fixed the failure handling for `repartition`, to avoid correctness 
issues.
    
    For `repartition`, it applies a stateful map function to generate a 
round-robin id, which is order sensitive and makes the RDD's output 
indeterminate. When the stage contains `repartition` reruns, we must also rerun 
all the tasks of all the succeeding stages.
    
    **future improvement:**
    1. Currently we can't rollback and rerun a shuffle map stage, and just 
fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
    2. Currently we can't rollback and rerun a result stage, and just fail. We 
should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
    3. We should provide public API to allow users to tag the random level of 
the RDD's computing function.
    
    a new test case
    
    Closes #22354 from cloud-fan/repartition.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>

commit 035fbb7eff038474b2637828ffaa0dd066aa44f4
Author: Bruce Robbins <bersprockets@...>
Date:   2018-09-07T22:34:09Z

    compilation error

commit bcc43d1b68df61464a5910d50a22d010928a3ead
Author: Josh Rosen <joshrosen@...>
Date:   2017-06-12T01:34:12Z

    [SPARK-20715] Store MapStatuses only in MapOutputTracker, not 
ShuffleMapStage
    
    ## What changes were proposed in this pull request?
    
    This PR refactors `ShuffleMapStage` and `MapOutputTracker` in order to 
simplify the management of `MapStatuses`, reduce driver memory consumption, and 
remove a potential source of scheduler correctness bugs.
    
    ### Background
    
    In Spark there are currently two places where MapStatuses are tracked:
    
    - The `MapOutputTracker` maintains an `Array[MapStatus]` storing a single 
location for each map output. This mapping is used by the `DAGScheduler` for 
determining reduce-task locality preferences (when locality-aware reduce task 
scheduling is enabled) and is also used to serve map output locations to 
executors / tasks.
    - Each `ShuffleMapStage` also contains a mapping of 
`Array[List[MapStatus]]` which holds the complete set of locations where each 
map output could be available. This mapping is used to determine which map 
tasks need to be run when constructing `TaskSets` for the stage.
    
    This duplication adds complexity and creates the potential for certain 
types of correctness bugs.  Bad things can happen if these two copies of the 
map output locations get out of sync. For instance, if the `MapOutputTracker` 
is missing locations for a map output but `ShuffleMapStage` believes that 
locations are available then tasks will fail with 
`MetadataFetchFailedException` but `ShuffleMapStage` will not be updated to 
reflect the missing map outputs, leading to situations where the stage will be 
reattempted (because downstream stages experienced fetch failures) but no task 
sets will be launched (because `ShuffleMapStage` thinks all maps are available).
    
    I observed this behavior in a real-world deployment. I'm still not quite 
sure how the state got out of sync in the first place, but we can completely 
avoid this class of bug if we eliminate the duplicate state.
    
    ### Why we only need to track a single location for each map output
    
    I think that storing an `Array[List[MapStatus]]` in `ShuffleMapStage` is 
unnecessary.
    
    First, note that this adds memory/object bloat to the driver we need one 
extra `List` per task. If you have millions of tasks across all stages then 
this can add up to be a significant amount of resources.
    
    Secondly, I believe that it's extremely uncommon that these lists will ever 
contain more than one entry. It's not impossible, but is very unlikely given 
the conditions which must occur for that to happen:
    
    - In normal operation (no task failures) we'll only run each task once and 
thus will have at most one output.
    - If speculation is enabled then it's possible that we'll have multiple 
attempts of a task. The TaskSetManager will [kill duplicate attempts of a 
task](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L717)
 after a task finishes successfully, reducing the likelihood that both the 
original and speculated task will successfully register map outputs.
    - There is a [comment in 
`TaskSetManager`](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L113)
 which suggests that running tasks are not killed if a task set becomes a 
zombie. However:
      - If the task set becomes a zombie due to the job being cancelled then it 
doesn't matter whether we record map outputs.
      - If the task set became a zombie because of a stage failure (e.g. the 
map stage itself had a fetch failure from an upstream match stage) then I 
believe that the "failedEpoch" will be updated which may cause map outputs from 
still-running tasks to [be 
ignored](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1213).
 (I'm not 100% sure on this point, though).
    - Even if you _do_ manage to record multiple map outputs for a stage, only 
a single map output is reported to / tracked by the MapOutputTracker. The only 
situation where the additional output locations could actually be read or used 
would be if a task experienced a `FetchFailure` exception. The most likely 
cause of a `FetchFailure` exception is an executor lost, which will have most 
likely caused the loss of several map tasks' output, so saving on potential 
re-execution of a single map task isn't a huge win if we're going to have to 
recompute several other lost map outputs from other tasks which ran on that 
lost executor. Also note that the re-population of MapOutputTracker state from 
state in the ShuffleMapTask only happens after the reduce stage has failed; the 
additional location doesn't help to prevent FetchFailures but, instead, can 
only reduce the amount of work when recomputing missing parent stages.
    
    Given this, this patch chooses to do away with tracking multiple locations 
for map outputs and instead stores only a single location. This change removes 
the main distinction between the `ShuffleMapTask` and `MapOutputTracker`'s 
copies of this state, paving the way for storing it only in the 
`MapOutputTracker`.
    
    ### Overview of other changes
    
    - Significantly simplified the cache / lock management inside of the 
`MapOutputTrackerMaster`:
      - The old code had several parallel `HashMap`s which had to be guarded by 
maps of `Object`s which were used as locks. This code was somewhat complicated 
to follow.
      - The new code uses a new `ShuffleStatus` class to group together all of 
the state associated with a particular shuffle, including cached serialized map 
statuses, significantly simplifying the logic.
    - Moved more code out of the shared `MapOutputTracker` abstract base class 
and into the `MapOutputTrackerMaster` and `MapOutputTrackerWorker` subclasses. 
This makes it easier to reason about which functionality needs to be supported 
only on the driver or executor.
    - Removed a bunch of code from the `DAGScheduler` which was used to 
synchronize information from the `MapOutputTracker` to `ShuffleMapStage`.
    - Added comments to clarify the role of `MapOutputTrackerMaster`'s `epoch` 
in invalidating executor-side shuffle map output caches.
    
    I will comment on these changes via inline GitHub review comments.
    
    /cc hvanhovell and rxin (whom I discussed this with offline), tgravescs 
(who recently worked on caching of serialized MapOutputStatuses), and 
kayousterhout and markhamstra (for scheduler changes).
    
    ## How was this patch tested?
    
    Existing tests. I purposely avoided making interface / API which would 
require significant updates or modifications to test code.
    
    Author: Josh Rosen <joshro...@databricks.com>
    
    Closes #17955 from JoshRosen/map-output-tracker-rewrite.

commit efd58ed96c1a8caeb2fa8da8d3300703ead047f6
Author: Bruce Robbins <bersprockets@...>
Date:   2018-09-08T18:18:44Z

    Remove extraneous blank line added during conflict resolution

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to