[ 
https://issues.apache.org/jira/browse/SPARK-24955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

San Tung updated SPARK-24955:
-----------------------------
    Description: 
We've recently run into a few instances where a downed node has led to 
incomplete data, causing correctness issues, which we can reproduce some of the 
time.

*Setup:*
 - we're currently on spark 2.3.0
 - we allow retries on failed tasks and stages
 - we use PySpark to perform these operations

*Stages:*

Simplistically, the job does the following:
 - Stage 1/2: computes a number of `(sha256 hash, 0, 1)` partitioned into 65536 
partitions
 - Stage 3/4: computes a number of `(sha256 hash, 1, 0)` partitioned into 6408 
partitions (one hash may exist in multiple partitions)
 - Stage 5:
 - repartitions stage 2 and stage 4 by the first 2 bytes of each hash, and find 
which ones are not in common (stage 2 hashes - stage 4 hashes).
 - store this partition into a persistent data source.

*Failure Scenario:*
 - We take out one of the machines (do a forced shutdown, for example)
 - For some tasks, stage 5 will die immediately with one of the following:
 ** `ExecutorLostFailure (executor 24 exited caused by one of the running 
tasks) Reason: worker lost`
 ** `FetchFailed(BlockManagerId(24, [redacted], 36829, None), shuffleId=2, 
mapId=14377, reduceId=48402, message=`
 - these tasks are reused to calculate stage 1-2 and 3-4 again that were 
missing on downed nodes, which is correctly recalculated by spark.
 - However, some tasks still continue executing from Stage 5, seemingly missing 
stage 4 data, dumping incorrect data to the stage 5 data source. We noticed the 
subtract operation taking ~1-2 minutes after the machine goes down, and stores 
a lot more data than usual (which on inspection is wrong).
 - we've seen this happen with slightly different execution plans too which 
don't involve or-ing, but end up being some variant of missing some stage 4 
data.

However, we cannot reproduce this consistently - sometimes all tasks fail 
gracefully. Correctly downed nodes means all these tasks fail and re-work on 
stage 1-2/3-4. Note that this solution produces the correct results if machines 
stay alive!

We were wondering if a machine going down can result in a state where a task 
could keep executing even though not all data has been fetched which gives us 
incorrect results (or if there is setting that allows this - we tried scanning 
spark configs up and down). This seems similar to 
https://issues.apache.org/jira/browse/SPARK-24160 (maybe we get an empty 
packet?), but it doesn't look like that was to explicitly resolve any known bug.

  was:
We've recently run into a few instances where a downed node has led to 
incomplete data, causing correctness issues, which we can reproduce some of the 
time.

*Setup:*
 - we're currently on spark 2.3.0
 - we allow retries on failed tasks and stages
 - we use PySpark to perform these operations

*Stages:*

Simplistically, the job does the following:
 - Stage 1/2: computes a number of `(sha256 hash, 0, 1)` partitioned into 65536 
partitions
 - Stage 3/4: computes a number of `(sha256 hash, 1, 0)` partitioned into 6408 
partitions (one hash may exist in multiple partitions)
 - Stage 5:
 - repartitions stage 2 and stage 4 by the first 2 bytes of each hash, and find 
which ones are not in common (stage 2 hashes - stage 4 hashes).
 - store this partition into a persistent data source.

*Failure Scenario:*
 - We take out one of the machines (do a forced shutdown, for example)
 - For some tasks, stage 5 will die immediately with one of the following:
 - `ExecutorLostFailure (executor 24 exited caused by one of the running tasks) 
Reason: worker lost`
 - `FetchFailed(BlockManagerId(24, [redacted], 36829, None), shuffleId=2, 
mapId=14377, reduceId=48402, message=`
 - these tasks are reused to calculate stage 1-2 and 3-4 again that were 
missing on downed nodes, which is correctly recalculated by spark.
 - However, some tasks still continue executing from Stage 5, seemingly missing 
stage 4 data, dumping incorrect data to the stage 5 data source. We noticed the 
subtract operation taking ~1-2 minutes after the machine goes down, and stores 
a lot more data than usual (which on inspection is wrong).
 - we've seen this happen with slightly different execution plans too which 
don't involve or-ing, but end up being some variant of missing some stage 4 
data.

However, we cannot reproduce this consistently - sometimes all tasks fail 
gracefully. Correctly downed nodes means all these tasks fail and re-work on 
stage 1-2/3-4. Note that this solution produces the correct results if machines 
stay alive!

We were wondering if a machine going down can result in a state where a task 
could keep executing even though not all data has been fetched which gives us 
incorrect results (or if there is setting that allows this - we tried scanning 
spark configs up and down). This seems similar to 
https://issues.apache.org/jira/browse/SPARK-24160 (maybe we get an empty 
packet?), but it doesn't look like that was to explicitly resolve any known bug.


> spark continuing to execute on a task despite not reading all data from a 
> downed machine
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-24955
>                 URL: https://issues.apache.org/jira/browse/SPARK-24955
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Shuffle
>    Affects Versions: 2.3.0
>            Reporter: San Tung
>            Priority: Major
>
> We've recently run into a few instances where a downed node has led to 
> incomplete data, causing correctness issues, which we can reproduce some of 
> the time.
> *Setup:*
>  - we're currently on spark 2.3.0
>  - we allow retries on failed tasks and stages
>  - we use PySpark to perform these operations
> *Stages:*
> Simplistically, the job does the following:
>  - Stage 1/2: computes a number of `(sha256 hash, 0, 1)` partitioned into 
> 65536 partitions
>  - Stage 3/4: computes a number of `(sha256 hash, 1, 0)` partitioned into 
> 6408 partitions (one hash may exist in multiple partitions)
>  - Stage 5:
>  - repartitions stage 2 and stage 4 by the first 2 bytes of each hash, and 
> find which ones are not in common (stage 2 hashes - stage 4 hashes).
>  - store this partition into a persistent data source.
> *Failure Scenario:*
>  - We take out one of the machines (do a forced shutdown, for example)
>  - For some tasks, stage 5 will die immediately with one of the following:
>  ** `ExecutorLostFailure (executor 24 exited caused by one of the running 
> tasks) Reason: worker lost`
>  ** `FetchFailed(BlockManagerId(24, [redacted], 36829, None), shuffleId=2, 
> mapId=14377, reduceId=48402, message=`
>  - these tasks are reused to calculate stage 1-2 and 3-4 again that were 
> missing on downed nodes, which is correctly recalculated by spark.
>  - However, some tasks still continue executing from Stage 5, seemingly 
> missing stage 4 data, dumping incorrect data to the stage 5 data source. We 
> noticed the subtract operation taking ~1-2 minutes after the machine goes 
> down, and stores a lot more data than usual (which on inspection is wrong).
>  - we've seen this happen with slightly different execution plans too which 
> don't involve or-ing, but end up being some variant of missing some stage 4 
> data.
> However, we cannot reproduce this consistently - sometimes all tasks fail 
> gracefully. Correctly downed nodes means all these tasks fail and re-work on 
> stage 1-2/3-4. Note that this solution produces the correct results if 
> machines stay alive!
> We were wondering if a machine going down can result in a state where a task 
> could keep executing even though not all data has been fetched which gives us 
> incorrect results (or if there is setting that allows this - we tried 
> scanning spark configs up and down). This seems similar to 
> https://issues.apache.org/jira/browse/SPARK-24160 (maybe we get an empty 
> packet?), but it doesn't look like that was to explicitly resolve any known 
> bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to