super :( On Mon, Mar 4, 2024 at 6:19 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> "... in a nutshell if fetchFailedException occurs due to data node reboot > then it can create duplicate / missing data . so this is more of > hardware(env issue ) rather than spark issue ." > > As an overall conclusion your point is correct but again the answer is not > binary. > > Spark core relies on a distributed file system to store data across data > nodes. When Spark needs to process data, it fetches the required blocks > from the data nodes.* FetchFailedException*: means that Spark > encountered an error while fetching data blocks from a data node. If a data > node reboots unexpectedly, it becomes unavailable to Spark for a > period. During this time, Spark might attempt to fetch data blocks from the > unavailable node, resulting in the FetchFailedException.. Depending on the > timing and nature of the reboot and data access, this exception can lead > to:the following: > > - Duplicate Data: If Spark retries the fetch operation successfully > after the reboot, it might end up processing the same data twice, leading > to duplicates. > - Missing Data: If Spark cannot fetch all required data blocks due to > the unavailable data node, some data might be missing from the processing > results. > > The root cause of this issue lies in the data node reboot itself. So we > can conclude that it is not a problem with Spark core functionality but > rather an environmental issue within the distributed storage systemL You > need to ensure that your nodes are stable and minimise unexpected reboots > for whatever reason. Look at the host logs or run /usr/bin/dmesg to see > what happened.. > > Good luck > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Mon, 4 Mar 2024 at 01:30, Prem Sahoo <prem.re...@gmail.com> wrote: > >> thanks Mich, in a nutshell if fetchFailedException occurs due to data >> node reboot then it can create duplicate / missing data . so this is >> more of hardware(env issue ) rather than spark issue . >> >> >> >> On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> Hi, >>> >>> It seems to me that there are issues related to below >>> >>> *<Prem> I think when a task failed in between and retry task started >>> and completed it may create duplicate as failed task has some data + retry >>> task has full data. but my question is why spark keeps delta data or >>> according to you if speculative and original task completes generally spark >>> kills one of the tasks to get rid of dups data. when a data node is >>> rebooted then spark fault tolerant should go to other nodes isn't it ? then >>> why it has missing data.* >>> >>> Spark is designed to be fault-tolerant through lineage and >>> recomputation. However, there are scenarios where speculative execution or >>> task retries might lead to duplicated or missing data. So what are these? >>> >>> - Task Failure and Retry: You are correct that a failed task might have >>> processed some data before encountering the FetchFailedException. If a >>> retry succeeds, it would process the entire data partition again, leading >>> to duplicates. When a task fails, Spark may recompute the lost data by >>> recomputing the lost task on another node. The output of the retried task >>> is typically combined with the output of the original task during the final >>> stage of the computation. This combination is done to handle scenarios >>> where the original task partially completed and generated some output >>> before failing. Spark does not intentionally store partially processed >>> data. However, due to retries and speculative execution, duplicate >>> processing can occur. To the best of my knowledge, Spark itself doesn't >>> have a mechanism to identify and eliminate duplicates automatically. While >>> Spark might sometimes kill speculative tasks if the original one finishes, >>> it is not a guaranteed behavior. This depends on various factors like >>> scheduling and task dependencies. >>> >>> - Speculative Execution: Spark supports speculative execution, where the >>> same task is launched on multiple executors simultaneously. The result of >>> the first completed task is used, and the others are usually killed to >>> avoid duplicated results. However, speculative execution might introduce >>> some duplication in the final output if tasks on different executors >>> complete successfully. >>> >>> - Node Reboots and Fault Tolerance: If the data node reboot leads to >>> data corruption or loss, that data might be unavailable to Spark. Even with >>> fault tolerance, Spark cannot recover completely missing data. Fault >>> tolerance focuses on recovering from issues like executor failures, not >>> data loss on storage nodes. Overall, Spark's fault tolerance is designed to >>> handle executor failures by rescheduling tasks on other available executors >>> and temporary network issues by retrying fetches based on configuration. >>> >>> Here are some stuff to consider: >>> >>> - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower value >>> such as 1 or 2 to reduce the chance of duplicate processing attempts, if >>> retries are suspected to be a source. >>> - Disable speculative execution if needed: Consider disabling >>> speculative execution (spark.speculation=false) if duplicates are a major >>> concern. However, this might impact performance. >>> - Data persistence: As mentioned in the previous reply, persist >>> intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity >>> is critical. This ensures data availability even during node failures. >>> - Data validation checks: Implement data validation checks after >>> processing to identify potential duplicates or missing data. >>> HTH >>> Mich Talebzadeh, >>> Dad | Technologist | Solutions Architect | Engineer >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> >>> >>> On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <prem.re...@gmail.com> wrote: >>> >>>> Hello Mich, >>>> thanks for your reply. >>>> >>>> As an engineer I can chip in. You may have partial execution and >>>> retries meaning when spark encounters a *FetchFailedException*, it >>>> may retry fetching the data from the unavailable (the one being rebooted) >>>> node a few times before marking it permanently unavailable. However, if the >>>> rebooted node recovers quickly within this retry window, some executors >>>> might successfully fetch the data after a retry. *This leads to >>>> duplicate processing of the same data partition*. >>>> >>>> <Prem> data node reboot is taking more than 20 mins and our config >>>> spark.network.timeout=300s so we don't have dupls for the above reason. >>>> I am not sure this one applies to your spark version but spark may >>>> speculatively execute tasks on different executors to improve >>>> performance. If a task fails due to the *FetchFailedException*, a >>>> speculative task might be launched on another executor. This is where fun >>>> and games start. If the unavailable node recovers before the speculative >>>> task finishes, both the original and speculative tasks might complete >>>> successfully,* resulting in duplicates*. With regard to missing data, >>>> if the data node reboot leads to data corruption or loss, some data >>>> partitions might be completely unavailable. In this case, spark may skip >>>> processing that missing data, leading to missing data in the final output. >>>> >>>> <Prem> I think when a task failed in between and retry task started >>>> and completed it may create duplicate as failed task has some data + retry >>>> task has full data. but my question is why spark keeps delta data or >>>> according to you if speculative and original task completes generally spark >>>> kills one of the tasks to get rid of dups data. when a data node is >>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then >>>> why it has missing data. >>>> Potential remedies: Spark offers some features to mitigate these >>>> issues, but it might not guarantee complete elimination of duplicates or >>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it >>>> and *spark.speculation* to control retry attempts and speculative >>>> execution behavior. Lineage tracking is there to help. Spark can track data >>>> lineage, allowing you to identify potentially corrupted or missing data in >>>> some cases. You can consider persisting intermediate data results to a >>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data >>>> loss in case of node failures. Your mileage varies as it adds additional >>>> processing overhead but can ensure data integrity. >>>> >>>> <Prem> How spark will handle these without a checkpoint as it will slow >>>> down the process . I have data loss or duplication is due to >>>> fetchFailedException as a part of data node reboot. >>>> I have few config to minimize fetchFailedException >>>> spark.network.timeout=300s >>>> spark.reducer.maxReqsInFlight=4 >>>> spark.shuffle.io.retryWait=30s >>>> spark.shuffle.io.maxRetries=3 >>>> >>>> When we get a fetchFailedException due to data node reboot then spark >>>> should handle it gracefully isn't it ? >>>> or how to handle it ? >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Your point -> "When Spark job shows FetchFailedException it creates >>>>> few duplicate data and we see few data also missing , please explain why. >>>>> We have scenario when spark job complains *FetchFailedException as >>>>> one of the data node got ** rebooted middle of job running ."* >>>>> >>>>> As an engineer I can chip in. You may have partial execution and >>>>> retries meaning when spark encounters a *FetchFailedException*, it >>>>> may retry fetching the data from the unavailable (the one being rebooted) >>>>> node a few times before marking it permanently unavailable. However, if >>>>> the >>>>> rebooted node recovers quickly within this retry window, some executors >>>>> might successfully fetch the data after a retry. *This leads to >>>>> duplicate processing of the same data partition*. >>>>> >>>>> I am not sure this one applies to your spark version but spark may >>>>> speculatively execute tasks on different executors to improve >>>>> performance. If a task fails due to the *FetchFailedException*, a >>>>> speculative task might be launched on another executor. This is where fun >>>>> and games start. If the unavailable node recovers before the speculative >>>>> task finishes, both the original and speculative tasks might complete >>>>> successfully,* resulting in duplicates*. With regard to missing data, >>>>> if the data node reboot leads to data corruption or loss, some data >>>>> partitions might be completely unavailable. In this case, spark may skip >>>>> processing that missing data, leading to missing data in the final output. >>>>> >>>>> Potential remedies: Spark offers some features to mitigate these >>>>> issues, but it might not guarantee complete elimination of duplicates or >>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it >>>>> and *spark.speculation* to control retry attempts and speculative >>>>> execution behavior. Lineage tracking is there to help. Spark can track >>>>> data >>>>> lineage, allowing you to identify potentially corrupted or missing data in >>>>> some cases. You can consider persisting intermediate data results to a >>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data >>>>> loss in case of node failures. Your mileage varies as it adds additional >>>>> processing overhead but can ensure data integrity. >>>>> >>>>> HTH >>>>> >>>>> Mich Talebzadeh, >>>>> Dad | Technologist >>>>> London >>>>> United Kingdom >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> >>>>> >>>>> >>>>> *Disclaimer:* The information provided is correct to the best of my >>>>> knowledge but of course cannot be guaranteed . It is essential to note >>>>> that, as with any advice, quote "one test result is worth one-thousand >>>>> expert opinions (Werner >>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>>>> >>>>> >>>>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <prem.re...@gmail.com> wrote: >>>>> >>>>>> Hello All, >>>>>> in the list of JIRAs i didn't find anything related to >>>>>> fetchFailedException. >>>>>> >>>>>> as mentioned above >>>>>> >>>>>> "When Spark job shows FetchFailedException it creates few duplicate >>>>>> data and we see few data also missing , please explain why. We have a >>>>>> scenario when spark job complains FetchFailedException as one of the data >>>>>> nodes got rebooted in the middle of job running . >>>>>> Now due to this we have few duplicate data and few missing data . Why >>>>>> is spark not handling this scenario correctly ? kind of we shouldn't miss >>>>>> any data and we shouldn't create duplicate data . " >>>>>> >>>>>> We have to rerun the job again to fix this data quality issue . >>>>>> Please let me know why this case is not handled properly by Spark ? >>>>>> >>>>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun < >>>>>> dongjoon.h...@gmail.com> wrote: >>>>>> >>>>>>> Please use the url as thr full string including '()' part. >>>>>>> >>>>>>> Or you can seach directly at ASF Jira with 'Spark' project and three >>>>>>> labels, 'Correctness', 'correctness' and 'data-loss'. >>>>>>> >>>>>>> Dongjoon >>>>>>> >>>>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <prem.re...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello Dongjoon, >>>>>>>> Thanks for emailing me. >>>>>>>> Could you please share a list of fixes as the link provided by you >>>>>>>> is not working. >>>>>>>> >>>>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <dongj...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> If you are observing correctness issues, you may hit some old (and >>>>>>>>> fixed) correctness issues. >>>>>>>>> >>>>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31 >>>>>>>>> correctness issues. >>>>>>>>> >>>>>>>>> >>>>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss) >>>>>>>>> >>>>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too. >>>>>>>>> >>>>>>>>> Please use the latest version, Apache Spark 3.5.1, because Apache >>>>>>>>> Spark 3.2 and 3.3 are in the End-Of-Support status of the community. >>>>>>>>> >>>>>>>>> It would be help if you can report any correctness issues with >>>>>>>>> Apache Spark 3.5.1. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Dongjoon. >>>>>>>>> >>>>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote: >>>>>>>>> > When Spark job shows FetchFailedException it creates few >>>>>>>>> duplicate data and >>>>>>>>> > we see few data also missing , please explain why. We have >>>>>>>>> scenario when >>>>>>>>> > spark job complains FetchFailedException as one of the data node >>>>>>>>> got >>>>>>>>> > rebooted middle of job running . >>>>>>>>> > >>>>>>>>> > Now due to this we have few duplicate data and few missing data >>>>>>>>> . Why spark >>>>>>>>> > is not handling this scenario correctly ? kind of we shouldn't >>>>>>>>> miss any >>>>>>>>> > data and we shouldn't create duplicate data . >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > I am using spark3.2.0 version. >>>>>>>>> > >>>>>>>>> >>>>>>>>> >>>>>>>>> --------------------------------------------------------------------- >>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>>>>>> >>>>>>>>>