Thanks Jason for detailed information and big associated with it. Hopefully someone provided more information about this pressing issue.
On Mon, Mar 4, 2024 at 1:26 PM Jason Xu <jasonxu.sp...@gmail.com> wrote: > Hi Prem, > > From the symptom of shuffle fetch failure and few duplicate data and few > missing data, I think you might run into this correctness bug: > https://issues.apache.org/jira/browse/SPARK-38388. > > Node/shuffle failure is hard to avoid, I wonder if you have > non-deterministic logic and calling repartition() (round robin > partitioning) in your code? If you can avoid either of these, you can avoid > the issue from happening for now. To root fix the issue, it requires a > non-trivial effort, I don't think there's a solution available yet. > > I have heard that there are community efforts to solve this issue, but I > lack detailed information. Hopefully, someone with more knowledge can > provide further insight. > > Best, > Jason > > On Mon, Mar 4, 2024 at 9:41 AM Prem Sahoo <prem.re...@gmail.com> wrote: > >> super :( >> >> On Mon, Mar 4, 2024 at 6:19 AM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> "... in a nutshell if fetchFailedException occurs due to data node >>> reboot then it can create duplicate / missing data . so this is more of >>> hardware(env issue ) rather than spark issue ." >>> >>> As an overall conclusion your point is correct but again the answer is >>> not binary. >>> >>> Spark core relies on a distributed file system to store data across data >>> nodes. When Spark needs to process data, it fetches the required blocks >>> from the data nodes.* FetchFailedException*: means that Spark >>> encountered an error while fetching data blocks from a data node. If a data >>> node reboots unexpectedly, it becomes unavailable to Spark for a >>> period. During this time, Spark might attempt to fetch data blocks from the >>> unavailable node, resulting in the FetchFailedException.. Depending on the >>> timing and nature of the reboot and data access, this exception can lead >>> to:the following: >>> >>> - Duplicate Data: If Spark retries the fetch operation successfully >>> after the reboot, it might end up processing the same data twice, leading >>> to duplicates. >>> - Missing Data: If Spark cannot fetch all required data blocks due >>> to the unavailable data node, some data might be missing from the >>> processing results. >>> >>> The root cause of this issue lies in the data node reboot itself. So we >>> can conclude that it is not a problem with Spark core functionality but >>> rather an environmental issue within the distributed storage systemL You >>> need to ensure that your nodes are stable and minimise unexpected reboots >>> for whatever reason. Look at the host logs or run /usr/bin/dmesg to see >>> what happened.. >>> >>> Good luck >>> >>> Mich Talebzadeh, >>> Dad | Technologist | Solutions Architect | Engineer >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> >>> >>> On Mon, 4 Mar 2024 at 01:30, Prem Sahoo <prem.re...@gmail.com> wrote: >>> >>>> thanks Mich, in a nutshell if fetchFailedException occurs due to data >>>> node reboot then it can create duplicate / missing data . so this is >>>> more of hardware(env issue ) rather than spark issue . >>>> >>>> >>>> >>>> On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> It seems to me that there are issues related to below >>>>> >>>>> *<Prem> I think when a task failed in between and retry task started >>>>> and completed it may create duplicate as failed task has some data + retry >>>>> task has full data. but my question is why spark keeps delta data or >>>>> according to you if speculative and original task completes generally >>>>> spark >>>>> kills one of the tasks to get rid of dups data. when a data node is >>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? >>>>> then >>>>> why it has missing data.* >>>>> >>>>> Spark is designed to be fault-tolerant through lineage and >>>>> recomputation. However, there are scenarios where speculative execution or >>>>> task retries might lead to duplicated or missing data. So what are these? >>>>> >>>>> - Task Failure and Retry: You are correct that a failed task might >>>>> have processed some data before encountering the FetchFailedException. If >>>>> a >>>>> retry succeeds, it would process the entire data partition again, leading >>>>> to duplicates. When a task fails, Spark may recompute the lost data by >>>>> recomputing the lost task on another node. The output of the retried task >>>>> is typically combined with the output of the original task during the >>>>> final >>>>> stage of the computation. This combination is done to handle scenarios >>>>> where the original task partially completed and generated some output >>>>> before failing. Spark does not intentionally store partially processed >>>>> data. However, due to retries and speculative execution, duplicate >>>>> processing can occur. To the best of my knowledge, Spark itself doesn't >>>>> have a mechanism to identify and eliminate duplicates automatically. While >>>>> Spark might sometimes kill speculative tasks if the original one finishes, >>>>> it is not a guaranteed behavior. This depends on various factors like >>>>> scheduling and task dependencies. >>>>> >>>>> - Speculative Execution: Spark supports speculative execution, where >>>>> the same task is launched on multiple executors simultaneously. The result >>>>> of the first completed task is used, and the others are usually killed to >>>>> avoid duplicated results. However, speculative execution might introduce >>>>> some duplication in the final output if tasks on different executors >>>>> complete successfully. >>>>> >>>>> - Node Reboots and Fault Tolerance: If the data node reboot leads to >>>>> data corruption or loss, that data might be unavailable to Spark. Even >>>>> with >>>>> fault tolerance, Spark cannot recover completely missing data. Fault >>>>> tolerance focuses on recovering from issues like executor failures, not >>>>> data loss on storage nodes. Overall, Spark's fault tolerance is designed >>>>> to >>>>> handle executor failures by rescheduling tasks on other available >>>>> executors >>>>> and temporary network issues by retrying fetches based on configuration. >>>>> >>>>> Here are some stuff to consider: >>>>> >>>>> - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower >>>>> value such as 1 or 2 to reduce the chance of duplicate processing >>>>> attempts, if retries are suspected to be a source. >>>>> - Disable speculative execution if needed: Consider disabling >>>>> speculative execution (spark.speculation=false) if duplicates are a major >>>>> concern. However, this might impact performance. >>>>> - Data persistence: As mentioned in the previous reply, persist >>>>> intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity >>>>> is critical. This ensures data availability even during node failures. >>>>> - Data validation checks: Implement data validation checks after >>>>> processing to identify potential duplicates or missing data. >>>>> HTH >>>>> Mich Talebzadeh, >>>>> Dad | Technologist | Solutions Architect | Engineer >>>>> London >>>>> United Kingdom >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> >>>>> >>>>> >>>>> *Disclaimer:* The information provided is correct to the best of my >>>>> knowledge but of course cannot be guaranteed . It is essential to note >>>>> that, as with any advice, quote "one test result is worth one-thousand >>>>> expert opinions (Werner >>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>>>> >>>>> >>>>> On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <prem.re...@gmail.com> wrote: >>>>> >>>>>> Hello Mich, >>>>>> thanks for your reply. >>>>>> >>>>>> As an engineer I can chip in. You may have partial execution and >>>>>> retries meaning when spark encounters a *FetchFailedException*, it >>>>>> may retry fetching the data from the unavailable (the one being rebooted) >>>>>> node a few times before marking it permanently unavailable. However, if >>>>>> the >>>>>> rebooted node recovers quickly within this retry window, some executors >>>>>> might successfully fetch the data after a retry. *This leads to >>>>>> duplicate processing of the same data partition*. >>>>>> >>>>>> <Prem> data node reboot is taking more than 20 mins and our config >>>>>> spark.network.timeout=300s so we don't have dupls for the above reason. >>>>>> I am not sure this one applies to your spark version but spark may >>>>>> speculatively execute tasks on different executors to improve >>>>>> performance. If a task fails due to the *FetchFailedException*, a >>>>>> speculative task might be launched on another executor. This is where fun >>>>>> and games start. If the unavailable node recovers before the speculative >>>>>> task finishes, both the original and speculative tasks might complete >>>>>> successfully,* resulting in duplicates*. With regard to missing >>>>>> data, if the data node reboot leads to data corruption or loss, some data >>>>>> partitions might be completely unavailable. In this case, spark may skip >>>>>> processing that missing data, leading to missing data in the final >>>>>> output. >>>>>> >>>>>> <Prem> I think when a task failed in between and retry task started >>>>>> and completed it may create duplicate as failed task has some data + >>>>>> retry >>>>>> task has full data. but my question is why spark keeps delta data or >>>>>> according to you if speculative and original task completes generally >>>>>> spark >>>>>> kills one of the tasks to get rid of dups data. when a data node is >>>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? >>>>>> then >>>>>> why it has missing data. >>>>>> Potential remedies: Spark offers some features to mitigate these >>>>>> issues, but it might not guarantee complete elimination of duplicates or >>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it >>>>>> and *spark.speculation* to control retry attempts and speculative >>>>>> execution behavior. Lineage tracking is there to help. Spark can track >>>>>> data >>>>>> lineage, allowing you to identify potentially corrupted or missing data >>>>>> in >>>>>> some cases. You can consider persisting intermediate data results to a >>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid >>>>>> data >>>>>> loss in case of node failures. Your mileage varies as it adds additional >>>>>> processing overhead but can ensure data integrity. >>>>>> >>>>>> <Prem> How spark will handle these without a checkpoint as it will >>>>>> slow down the process . I have data loss or duplication is due to >>>>>> fetchFailedException as a part of data node reboot. >>>>>> I have few config to minimize fetchFailedException >>>>>> spark.network.timeout=300s >>>>>> spark.reducer.maxReqsInFlight=4 >>>>>> spark.shuffle.io.retryWait=30s >>>>>> spark.shuffle.io.maxRetries=3 >>>>>> >>>>>> When we get a fetchFailedException due to data node reboot then spark >>>>>> should handle it gracefully isn't it ? >>>>>> or how to handle it ? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Your point -> "When Spark job shows FetchFailedException it creates >>>>>>> few duplicate data and we see few data also missing , please explain >>>>>>> why. >>>>>>> We have scenario when spark job complains *FetchFailedException as >>>>>>> one of the data node got ** rebooted middle of job running ."* >>>>>>> >>>>>>> As an engineer I can chip in. You may have partial execution and >>>>>>> retries meaning when spark encounters a *FetchFailedException*, >>>>>>> it may retry fetching the data from the unavailable (the one being >>>>>>> rebooted) node a few times before marking it permanently unavailable. >>>>>>> However, if the rebooted node recovers quickly within this retry window, >>>>>>> some executors might successfully fetch the data after a retry. *This >>>>>>> leads to duplicate processing of the same data partition*. >>>>>>> >>>>>>> I am not sure this one applies to your spark version but spark may >>>>>>> speculatively execute tasks on different executors to improve >>>>>>> performance. If a task fails due to the *FetchFailedException*, a >>>>>>> speculative task might be launched on another executor. This is where >>>>>>> fun >>>>>>> and games start. If the unavailable node recovers before the speculative >>>>>>> task finishes, both the original and speculative tasks might complete >>>>>>> successfully,* resulting in duplicates*. With regard to missing >>>>>>> data, if the data node reboot leads to data corruption or loss, some >>>>>>> data >>>>>>> partitions might be completely unavailable. In this case, spark may skip >>>>>>> processing that missing data, leading to missing data in the final >>>>>>> output. >>>>>>> >>>>>>> Potential remedies: Spark offers some features to mitigate these >>>>>>> issues, but it might not guarantee complete elimination of duplicates or >>>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it >>>>>>> and *spark.speculation* to control retry attempts and speculative >>>>>>> execution behavior. Lineage tracking is there to help. Spark can track >>>>>>> data >>>>>>> lineage, allowing you to identify potentially corrupted or missing data >>>>>>> in >>>>>>> some cases. You can consider persisting intermediate data results to a >>>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid >>>>>>> data >>>>>>> loss in case of node failures. Your mileage varies as it adds >>>>>>> additional >>>>>>> processing overhead but can ensure data integrity. >>>>>>> >>>>>>> HTH >>>>>>> >>>>>>> Mich Talebzadeh, >>>>>>> Dad | Technologist >>>>>>> London >>>>>>> United Kingdom >>>>>>> >>>>>>> >>>>>>> view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* The information provided is correct to the best of my >>>>>>> knowledge but of course cannot be guaranteed . It is essential to note >>>>>>> that, as with any advice, quote "one test result is worth one-thousand >>>>>>> expert opinions (Werner >>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>>>>>> >>>>>>> >>>>>>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <prem.re...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello All, >>>>>>>> in the list of JIRAs i didn't find anything related to >>>>>>>> fetchFailedException. >>>>>>>> >>>>>>>> as mentioned above >>>>>>>> >>>>>>>> "When Spark job shows FetchFailedException it creates few duplicate >>>>>>>> data and we see few data also missing , please explain why. We have a >>>>>>>> scenario when spark job complains FetchFailedException as one of the >>>>>>>> data >>>>>>>> nodes got rebooted in the middle of job running . >>>>>>>> Now due to this we have few duplicate data and few missing data . >>>>>>>> Why is spark not handling this scenario correctly ? kind of we >>>>>>>> shouldn't >>>>>>>> miss any data and we shouldn't create duplicate data . " >>>>>>>> >>>>>>>> We have to rerun the job again to fix this data quality issue . >>>>>>>> Please let me know why this case is not handled properly by Spark ? >>>>>>>> >>>>>>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun < >>>>>>>> dongjoon.h...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Please use the url as thr full string including '()' part. >>>>>>>>> >>>>>>>>> Or you can seach directly at ASF Jira with 'Spark' project and >>>>>>>>> three labels, 'Correctness', 'correctness' and 'data-loss'. >>>>>>>>> >>>>>>>>> Dongjoon >>>>>>>>> >>>>>>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <prem.re...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello Dongjoon, >>>>>>>>>> Thanks for emailing me. >>>>>>>>>> Could you please share a list of fixes as the link provided by >>>>>>>>>> you is not working. >>>>>>>>>> >>>>>>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun < >>>>>>>>>> dongj...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> If you are observing correctness issues, you may hit some old >>>>>>>>>>> (and fixed) correctness issues. >>>>>>>>>>> >>>>>>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31 >>>>>>>>>>> correctness issues. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss) >>>>>>>>>>> >>>>>>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too. >>>>>>>>>>> >>>>>>>>>>> Please use the latest version, Apache Spark 3.5.1, because >>>>>>>>>>> Apache Spark 3.2 and 3.3 are in the End-Of-Support status of the >>>>>>>>>>> community. >>>>>>>>>>> >>>>>>>>>>> It would be help if you can report any correctness issues with >>>>>>>>>>> Apache Spark 3.5.1. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Dongjoon. >>>>>>>>>>> >>>>>>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote: >>>>>>>>>>> > When Spark job shows FetchFailedException it creates few >>>>>>>>>>> duplicate data and >>>>>>>>>>> > we see few data also missing , please explain why. We have >>>>>>>>>>> scenario when >>>>>>>>>>> > spark job complains FetchFailedException as one of the data >>>>>>>>>>> node got >>>>>>>>>>> > rebooted middle of job running . >>>>>>>>>>> > >>>>>>>>>>> > Now due to this we have few duplicate data and few missing >>>>>>>>>>> data . Why spark >>>>>>>>>>> > is not handling this scenario correctly ? kind of we shouldn't >>>>>>>>>>> miss any >>>>>>>>>>> > data and we shouldn't create duplicate data . >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > I am using spark3.2.0 version. >>>>>>>>>>> > >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>>>>>>>> >>>>>>>>>>>