[ https://issues.apache.org/jira/browse/SPARK-24955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
San Tung updated SPARK-24955: ----------------------------- Description: We've recently run into a few instances where a downed node has led to incomplete data, causing correctness issues, which we can reproduce some of the time. *Setup:* - we're currently on spark 2.3.0 - we allow retries on failed tasks and stages - we use PySpark to perform these operations *Stages:* Simplistically, the job does the following: - Stage 1/2: computes a number of `(sha256 hash, 0, 1)` partitioned into 65536 partitions - Stage 3/4: computes a number of `(sha256 hash, 1, 0)` partitioned into 6408 partitions (one hash may exist in multiple partitions) - Stage 5: - repartitions stage 2 and stage 4 by the first 2 bytes of each hash, and find which ones are not in common (stage 2 hashes - stage 4 hashes). - store this partition into a persistent data source. *Failure Scenario:* - We take out one of the machines (do a forced shutdown, for example) - For some tasks, stage 5 will die immediately with one of the following: ** `ExecutorLostFailure (executor 24 exited caused by one of the running tasks) Reason: worker lost` ** `FetchFailed(BlockManagerId(24, [redacted], 36829, None), shuffleId=2, mapId=14377, reduceId=48402, message=` - these tasks are reused to calculate stage 1-2 and 3-4 again that were missing on downed nodes, which is correctly recalculated by spark. - However, some tasks still continue executing from Stage 5, seemingly missing stage 4 data, dumping incorrect data to the stage 5 data source. We noticed the subtract operation taking ~1-2 minutes after the machine goes down, and stores a lot more data than usual (which on inspection is wrong). - we've seen this happen with slightly different execution plans too which don't involve or-ing, but end up being some variant of missing some stage 4 data. However, we cannot reproduce this consistently - sometimes all tasks fail gracefully. Correctly downed nodes means all these tasks fail and re-work on stage 1-2/3-4. Note that this solution produces the correct results if machines stay alive! We were wondering if a machine going down can result in a state where a task could keep executing even though not all data has been fetched which gives us incorrect results (or if there is setting that allows this - we tried scanning spark configs up and down). This seems similar to https://issues.apache.org/jira/browse/SPARK-24160 (maybe we get an empty packet?), but it doesn't look like that was to explicitly resolve any known bug. was: We've recently run into a few instances where a downed node has led to incomplete data, causing correctness issues, which we can reproduce some of the time. *Setup:* - we're currently on spark 2.3.0 - we allow retries on failed tasks and stages - we use PySpark to perform these operations *Stages:* Simplistically, the job does the following: - Stage 1/2: computes a number of `(sha256 hash, 0, 1)` partitioned into 65536 partitions - Stage 3/4: computes a number of `(sha256 hash, 1, 0)` partitioned into 6408 partitions (one hash may exist in multiple partitions) - Stage 5: - repartitions stage 2 and stage 4 by the first 2 bytes of each hash, and find which ones are not in common (stage 2 hashes - stage 4 hashes). - store this partition into a persistent data source. *Failure Scenario:* - We take out one of the machines (do a forced shutdown, for example) - For some tasks, stage 5 will die immediately with one of the following: - `ExecutorLostFailure (executor 24 exited caused by one of the running tasks) Reason: worker lost` - `FetchFailed(BlockManagerId(24, [redacted], 36829, None), shuffleId=2, mapId=14377, reduceId=48402, message=` - these tasks are reused to calculate stage 1-2 and 3-4 again that were missing on downed nodes, which is correctly recalculated by spark. - However, some tasks still continue executing from Stage 5, seemingly missing stage 4 data, dumping incorrect data to the stage 5 data source. We noticed the subtract operation taking ~1-2 minutes after the machine goes down, and stores a lot more data than usual (which on inspection is wrong). - we've seen this happen with slightly different execution plans too which don't involve or-ing, but end up being some variant of missing some stage 4 data. However, we cannot reproduce this consistently - sometimes all tasks fail gracefully. Correctly downed nodes means all these tasks fail and re-work on stage 1-2/3-4. Note that this solution produces the correct results if machines stay alive! We were wondering if a machine going down can result in a state where a task could keep executing even though not all data has been fetched which gives us incorrect results (or if there is setting that allows this - we tried scanning spark configs up and down). This seems similar to https://issues.apache.org/jira/browse/SPARK-24160 (maybe we get an empty packet?), but it doesn't look like that was to explicitly resolve any known bug. > spark continuing to execute on a task despite not reading all data from a > downed machine > ---------------------------------------------------------------------------------------- > > Key: SPARK-24955 > URL: https://issues.apache.org/jira/browse/SPARK-24955 > Project: Spark > Issue Type: Bug > Components: PySpark, Shuffle > Affects Versions: 2.3.0 > Reporter: San Tung > Priority: Major > > We've recently run into a few instances where a downed node has led to > incomplete data, causing correctness issues, which we can reproduce some of > the time. > *Setup:* > - we're currently on spark 2.3.0 > - we allow retries on failed tasks and stages > - we use PySpark to perform these operations > *Stages:* > Simplistically, the job does the following: > - Stage 1/2: computes a number of `(sha256 hash, 0, 1)` partitioned into > 65536 partitions > - Stage 3/4: computes a number of `(sha256 hash, 1, 0)` partitioned into > 6408 partitions (one hash may exist in multiple partitions) > - Stage 5: > - repartitions stage 2 and stage 4 by the first 2 bytes of each hash, and > find which ones are not in common (stage 2 hashes - stage 4 hashes). > - store this partition into a persistent data source. > *Failure Scenario:* > - We take out one of the machines (do a forced shutdown, for example) > - For some tasks, stage 5 will die immediately with one of the following: > ** `ExecutorLostFailure (executor 24 exited caused by one of the running > tasks) Reason: worker lost` > ** `FetchFailed(BlockManagerId(24, [redacted], 36829, None), shuffleId=2, > mapId=14377, reduceId=48402, message=` > - these tasks are reused to calculate stage 1-2 and 3-4 again that were > missing on downed nodes, which is correctly recalculated by spark. > - However, some tasks still continue executing from Stage 5, seemingly > missing stage 4 data, dumping incorrect data to the stage 5 data source. We > noticed the subtract operation taking ~1-2 minutes after the machine goes > down, and stores a lot more data than usual (which on inspection is wrong). > - we've seen this happen with slightly different execution plans too which > don't involve or-ing, but end up being some variant of missing some stage 4 > data. > However, we cannot reproduce this consistently - sometimes all tasks fail > gracefully. Correctly downed nodes means all these tasks fail and re-work on > stage 1-2/3-4. Note that this solution produces the correct results if > machines stay alive! > We were wondering if a machine going down can result in a state where a task > could keep executing even though not all data has been fetched which gives us > incorrect results (or if there is setting that allows this - we tried > scanning spark configs up and down). This seems similar to > https://issues.apache.org/jira/browse/SPARK-24160 (maybe we get an empty > packet?), but it doesn't look like that was to explicitly resolve any known > bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org