thanks Mich, in a nutshell if fetchFailedException occurs due to data node reboot then it can create duplicate / missing data . so this is more of hardware(env issue ) rather than spark issue .
On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > It seems to me that there are issues related to below > > *<Prem> I think when a task failed in between and retry task started and > completed it may create duplicate as failed task has some data + retry task > has full data. but my question is why spark keeps delta data or > according to you if speculative and original task completes generally spark > kills one of the tasks to get rid of dups data. when a data node is > rebooted then spark fault tolerant should go to other nodes isn't it ? then > why it has missing data.* > > Spark is designed to be fault-tolerant through lineage and recomputation. > However, there are scenarios where speculative execution or task retries > might lead to duplicated or missing data. So what are these? > > - Task Failure and Retry: You are correct that a failed task might have > processed some data before encountering the FetchFailedException. If a > retry succeeds, it would process the entire data partition again, leading > to duplicates. When a task fails, Spark may recompute the lost data by > recomputing the lost task on another node. The output of the retried task > is typically combined with the output of the original task during the final > stage of the computation. This combination is done to handle scenarios > where the original task partially completed and generated some output > before failing. Spark does not intentionally store partially processed > data. However, due to retries and speculative execution, duplicate > processing can occur. To the best of my knowledge, Spark itself doesn't > have a mechanism to identify and eliminate duplicates automatically. While > Spark might sometimes kill speculative tasks if the original one finishes, > it is not a guaranteed behavior. This depends on various factors like > scheduling and task dependencies. > > - Speculative Execution: Spark supports speculative execution, where the > same task is launched on multiple executors simultaneously. The result of > the first completed task is used, and the others are usually killed to > avoid duplicated results. However, speculative execution might introduce > some duplication in the final output if tasks on different executors > complete successfully. > > - Node Reboots and Fault Tolerance: If the data node reboot leads to data > corruption or loss, that data might be unavailable to Spark. Even with > fault tolerance, Spark cannot recover completely missing data. Fault > tolerance focuses on recovering from issues like executor failures, not > data loss on storage nodes. Overall, Spark's fault tolerance is designed to > handle executor failures by rescheduling tasks on other available executors > and temporary network issues by retrying fetches based on configuration. > > Here are some stuff to consider: > > - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower value > such as 1 or 2 to reduce the chance of duplicate processing attempts, if > retries are suspected to be a source. > - Disable speculative execution if needed: Consider disabling speculative > execution (spark.speculation=false) if duplicates are a major concern. > However, this might impact performance. > - Data persistence: As mentioned in the previous reply, persist > intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity > is critical. This ensures data availability even during node failures. > - Data validation checks: Implement data validation checks after > processing to identify potential duplicates or missing data. > HTH > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <prem.re...@gmail.com> wrote: > >> Hello Mich, >> thanks for your reply. >> >> As an engineer I can chip in. You may have partial execution and retries >> meaning when spark encounters a *FetchFailedException*, it may retry >> fetching the data from the unavailable (the one being rebooted) node a few >> times before marking it permanently unavailable. However, if the rebooted >> node recovers quickly within this retry window, some executors might >> successfully fetch the data after a retry. *This leads to duplicate >> processing of the same data partition*. >> >> <Prem> data node reboot is taking more than 20 mins and our config >> spark.network.timeout=300s so we don't have dupls for the above reason. >> I am not sure this one applies to your spark version but spark may >> speculatively execute tasks on different executors to improve >> performance. If a task fails due to the *FetchFailedException*, a >> speculative task might be launched on another executor. This is where fun >> and games start. If the unavailable node recovers before the speculative >> task finishes, both the original and speculative tasks might complete >> successfully,* resulting in duplicates*. With regard to missing data, if >> the data node reboot leads to data corruption or loss, some data partitions >> might be completely unavailable. In this case, spark may skip processing >> that missing data, leading to missing data in the final output. >> >> <Prem> I think when a task failed in between and retry task started and >> completed it may create duplicate as failed task has some data + retry task >> has full data. but my question is why spark keeps delta data or >> according to you if speculative and original task completes generally spark >> kills one of the tasks to get rid of dups data. when a data node is >> rebooted then spark fault tolerant should go to other nodes isn't it ? then >> why it has missing data. >> Potential remedies: Spark offers some features to mitigate these issues, >> but it might not guarantee complete elimination of duplicates or data >> loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and >> *spark.speculation* to control retry attempts and speculative execution >> behavior. Lineage tracking is there to help. Spark can track data lineage, >> allowing you to identify potentially corrupted or missing data in some >> cases. You can consider persisting intermediate data results to a reliable >> storage (like HDFS or GCS or another cloud storage) to avoid data loss in >> case of node failures. Your mileage varies as it adds additional >> processing overhead but can ensure data integrity. >> >> <Prem> How spark will handle these without a checkpoint as it will slow >> down the process . I have data loss or duplication is due to >> fetchFailedException as a part of data node reboot. >> I have few config to minimize fetchFailedException >> spark.network.timeout=300s >> spark.reducer.maxReqsInFlight=4 >> spark.shuffle.io.retryWait=30s >> spark.shuffle.io.maxRetries=3 >> >> When we get a fetchFailedException due to data node reboot then spark >> should handle it gracefully isn't it ? >> or how to handle it ? >> >> >> >> >> >> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Your point -> "When Spark job shows FetchFailedException it creates few >>> duplicate data and we see few data also missing , please explain why. We >>> have scenario when spark job complains *FetchFailedException as one of >>> the data node got ** rebooted middle of job running ."* >>> >>> As an engineer I can chip in. You may have partial execution and >>> retries meaning when spark encounters a *FetchFailedException*, it >>> may retry fetching the data from the unavailable (the one being rebooted) >>> node a few times before marking it permanently unavailable. However, if the >>> rebooted node recovers quickly within this retry window, some executors >>> might successfully fetch the data after a retry. *This leads to >>> duplicate processing of the same data partition*. >>> >>> I am not sure this one applies to your spark version but spark may >>> speculatively execute tasks on different executors to improve >>> performance. If a task fails due to the *FetchFailedException*, a >>> speculative task might be launched on another executor. This is where fun >>> and games start. If the unavailable node recovers before the speculative >>> task finishes, both the original and speculative tasks might complete >>> successfully,* resulting in duplicates*. With regard to missing data, >>> if the data node reboot leads to data corruption or loss, some data >>> partitions might be completely unavailable. In this case, spark may skip >>> processing that missing data, leading to missing data in the final output. >>> >>> Potential remedies: Spark offers some features to mitigate these issues, >>> but it might not guarantee complete elimination of duplicates or data >>> loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and >>> *spark.speculation* to control retry attempts and speculative execution >>> behavior. Lineage tracking is there to help. Spark can track data lineage, >>> allowing you to identify potentially corrupted or missing data in some >>> cases. You can consider persisting intermediate data results to a reliable >>> storage (like HDFS or GCS or another cloud storage) to avoid data loss in >>> case of node failures. Your mileage varies as it adds additional >>> processing overhead but can ensure data integrity. >>> >>> HTH >>> >>> Mich Talebzadeh, >>> Dad | Technologist >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> >>> >>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <prem.re...@gmail.com> wrote: >>> >>>> Hello All, >>>> in the list of JIRAs i didn't find anything related to >>>> fetchFailedException. >>>> >>>> as mentioned above >>>> >>>> "When Spark job shows FetchFailedException it creates few duplicate >>>> data and we see few data also missing , please explain why. We have a >>>> scenario when spark job complains FetchFailedException as one of the data >>>> nodes got rebooted in the middle of job running . >>>> Now due to this we have few duplicate data and few missing data . Why >>>> is spark not handling this scenario correctly ? kind of we shouldn't miss >>>> any data and we shouldn't create duplicate data . " >>>> >>>> We have to rerun the job again to fix this data quality issue . Please >>>> let me know why this case is not handled properly by Spark ? >>>> >>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <dongjoon.h...@gmail.com> >>>> wrote: >>>> >>>>> Please use the url as thr full string including '()' part. >>>>> >>>>> Or you can seach directly at ASF Jira with 'Spark' project and three >>>>> labels, 'Correctness', 'correctness' and 'data-loss'. >>>>> >>>>> Dongjoon >>>>> >>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <prem.re...@gmail.com> wrote: >>>>> >>>>>> Hello Dongjoon, >>>>>> Thanks for emailing me. >>>>>> Could you please share a list of fixes as the link provided by you >>>>>> is not working. >>>>>> >>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <dongj...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> If you are observing correctness issues, you may hit some old (and >>>>>>> fixed) correctness issues. >>>>>>> >>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31 >>>>>>> correctness issues. >>>>>>> >>>>>>> >>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss) >>>>>>> >>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too. >>>>>>> >>>>>>> Please use the latest version, Apache Spark 3.5.1, because Apache >>>>>>> Spark 3.2 and 3.3 are in the End-Of-Support status of the community. >>>>>>> >>>>>>> It would be help if you can report any correctness issues with >>>>>>> Apache Spark 3.5.1. >>>>>>> >>>>>>> Thanks, >>>>>>> Dongjoon. >>>>>>> >>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote: >>>>>>> > When Spark job shows FetchFailedException it creates few duplicate >>>>>>> data and >>>>>>> > we see few data also missing , please explain why. We have >>>>>>> scenario when >>>>>>> > spark job complains FetchFailedException as one of the data node >>>>>>> got >>>>>>> > rebooted middle of job running . >>>>>>> > >>>>>>> > Now due to this we have few duplicate data and few missing data . >>>>>>> Why spark >>>>>>> > is not handling this scenario correctly ? kind of we shouldn't >>>>>>> miss any >>>>>>> > data and we shouldn't create duplicate data . >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > I am using spark3.2.0 version. >>>>>>> > >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>>>> >>>>>>>