Thanks Jason for detailed information and big associated with it.
Hopefully someone provided more information about this pressing issue.

On Mon, Mar 4, 2024 at 1:26 PM Jason Xu <jasonxu.sp...@gmail.com> wrote:

> Hi Prem,
>
> From the symptom of shuffle fetch failure and few duplicate data and few
> missing data, I think you might run into this correctness bug:
> https://issues.apache.org/jira/browse/SPARK-38388.
>
> Node/shuffle failure is hard to avoid, I wonder if you have
> non-deterministic logic and calling repartition() (round robin
> partitioning) in your code? If you can avoid either of these, you can avoid
> the issue from happening for now. To root fix the issue, it requires a
> non-trivial effort, I don't think there's a solution available yet.
>
> I have heard that there are community efforts to solve this issue, but I
> lack detailed information. Hopefully, someone with more knowledge can
> provide further insight.
>
> Best,
> Jason
>
> On Mon, Mar 4, 2024 at 9:41 AM Prem Sahoo <prem.re...@gmail.com> wrote:
>
>> super :(
>>
>> On Mon, Mar 4, 2024 at 6:19 AM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> "... in a nutshell  if fetchFailedException occurs due to data node
>>> reboot then it  can create duplicate / missing data  .   so this is more of
>>> hardware(env issue ) rather than spark issue ."
>>>
>>> As an overall conclusion your point is correct but again the answer is
>>> not binary.
>>>
>>> Spark core relies on a distributed file system to store data across data
>>> nodes. When Spark needs to process data, it fetches the required blocks
>>> from the data nodes.* FetchFailedException*: means  that Spark
>>> encountered an error while fetching data blocks from a data node. If a data
>>> node reboots unexpectedly, it becomes unavailable to Spark for a
>>> period. During this time, Spark might attempt to fetch data blocks from the
>>> unavailable node, resulting in the FetchFailedException.. Depending on the
>>> timing and nature of the reboot and data access, this exception can lead
>>> to:the following:
>>>
>>>    - Duplicate Data: If Spark retries the fetch operation successfully
>>>    after the reboot, it might end up processing the same data twice, leading
>>>    to duplicates.
>>>    - Missing Data: If Spark cannot fetch all required data blocks due
>>>    to the unavailable data node, some data might be missing from the
>>>    processing results.
>>>
>>> The root cause of this issue lies in the data node reboot itself. So we
>>> can conclude that it is not a  problem with Spark core functionality but
>>> rather an environmental issue within the distributed storage systemL  You
>>> need to ensure that your nodes are stable and minimise unexpected reboots
>>> for whatever reason. Look at the host logs  or run /usr/bin/dmesg to see
>>> what happened..
>>>
>>> Good luck
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Mon, 4 Mar 2024 at 01:30, Prem Sahoo <prem.re...@gmail.com> wrote:
>>>
>>>> thanks Mich, in a nutshell  if fetchFailedException occurs due to data
>>>> node reboot then it  can create duplicate / missing data  .   so this is
>>>> more of hardware(env issue ) rather than spark issue .
>>>>
>>>>
>>>>
>>>> On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> It seems to me that there are issues related to below
>>>>>
>>>>> *<Prem> I think when a task failed in between  and retry task started
>>>>> and completed it may create duplicate as failed task has some data + retry
>>>>> task has  full data.  but my question is why spark keeps delta data or
>>>>> according to you if speculative and original task completes generally 
>>>>> spark
>>>>> kills one of the tasks to get rid of dups data.  when a data node is
>>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? 
>>>>> then
>>>>> why it has missing data.*
>>>>>
>>>>> Spark is designed to be fault-tolerant through lineage and
>>>>> recomputation. However, there are scenarios where speculative execution or
>>>>> task retries might lead to duplicated or missing data. So what are these?
>>>>>
>>>>> - Task Failure and Retry: You are correct that a failed task might
>>>>> have processed some data before encountering the FetchFailedException. If 
>>>>> a
>>>>> retry succeeds, it would process the entire data partition again, leading
>>>>> to duplicates. When a task fails, Spark may recompute the lost data by
>>>>> recomputing the lost task on another node.  The output of the retried task
>>>>> is typically combined with the output of the original task during the 
>>>>> final
>>>>> stage of the computation. This combination is done to handle scenarios
>>>>> where the original task partially completed and generated some output
>>>>> before failing. Spark does not intentionally store partially processed
>>>>> data. However, due to retries and speculative execution, duplicate
>>>>> processing can occur. To the best of my knowledge, Spark itself doesn't
>>>>> have a mechanism to identify and eliminate duplicates automatically. While
>>>>> Spark might sometimes kill speculative tasks if the original one finishes,
>>>>> it is not a guaranteed behavior. This depends on various factors like
>>>>> scheduling and task dependencies.
>>>>>
>>>>> - Speculative Execution: Spark supports speculative execution, where
>>>>> the same task is launched on multiple executors simultaneously. The result
>>>>> of the first completed task is used, and the others are usually killed to
>>>>> avoid duplicated results. However, speculative execution might introduce
>>>>> some duplication in the final output if tasks on different executors
>>>>> complete successfully.
>>>>>
>>>>> - Node Reboots and Fault Tolerance: If the data node reboot leads to
>>>>> data corruption or loss, that data might be unavailable to Spark. Even 
>>>>> with
>>>>> fault tolerance, Spark cannot recover completely missing data. Fault
>>>>> tolerance focuses on recovering from issues like executor failures, not
>>>>> data loss on storage nodes. Overall, Spark's fault tolerance is designed 
>>>>> to
>>>>> handle executor failures by rescheduling tasks on other available 
>>>>> executors
>>>>> and temporary network issues by retrying fetches based on configuration.
>>>>>
>>>>> Here are some stuff to consider:
>>>>>
>>>>> - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower
>>>>> value such as  1 or 2 to reduce the chance of duplicate processing
>>>>> attempts, if retries are suspected to be a source.
>>>>> - Disable speculative execution if needed: Consider disabling
>>>>> speculative execution (spark.speculation=false) if duplicates are a major
>>>>> concern. However, this might impact performance.
>>>>> - Data persistence: As mentioned in the previous reply, persist
>>>>> intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity
>>>>> is critical. This ensures data availability even during node failures.
>>>>> - Data validation checks: Implement data validation checks after
>>>>> processing to identify potential duplicates or missing data.
>>>>> HTH
>>>>> Mich Talebzadeh,
>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>> expert opinions (Werner
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>
>>>>>
>>>>> On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <prem.re...@gmail.com> wrote:
>>>>>
>>>>>> Hello Mich,
>>>>>> thanks for your reply.
>>>>>>
>>>>>> As an engineer I can chip in. You may have partial execution and
>>>>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>>>>> may retry fetching the data from the unavailable (the one being rebooted)
>>>>>> node a few times before marking it permanently unavailable. However, if 
>>>>>> the
>>>>>> rebooted node recovers quickly within this retry window, some executors
>>>>>> might successfully fetch the data after a retry. *This leads to
>>>>>> duplicate processing of the same data partition*.
>>>>>>
>>>>>> <Prem> data node reboot is taking more than 20 mins and our config
>>>>>> spark.network.timeout=300s so we don't have dupls for the above reason.
>>>>>> I am not sure this one applies to your spark version but spark may
>>>>>> speculatively execute tasks on different executors to improve
>>>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>>>> speculative task might be launched on another executor. This is where fun
>>>>>> and games start. If the unavailable node recovers before the speculative
>>>>>> task finishes, both the original and speculative tasks might complete
>>>>>> successfully,* resulting in duplicates*. With regard to missing
>>>>>> data, if the data node reboot leads to data corruption or loss, some data
>>>>>> partitions might be completely unavailable. In this case, spark may skip
>>>>>> processing that missing data, leading to missing data in the final 
>>>>>> output.
>>>>>>
>>>>>> <Prem> I think when a task failed in between  and retry task started
>>>>>> and completed it may create duplicate as failed task has some data + 
>>>>>> retry
>>>>>> task has  full data.  but my question is why spark keeps delta data or
>>>>>> according to you if speculative and original task completes generally 
>>>>>> spark
>>>>>> kills one of the tasks to get rid of dups data.  when a data node is
>>>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? 
>>>>>> then
>>>>>> why it has missing data.
>>>>>> Potential remedies: Spark offers some features to mitigate these
>>>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>>>> and *spark.speculation* to control retry attempts and speculative
>>>>>> execution behavior. Lineage tracking is there to help. Spark can track 
>>>>>> data
>>>>>> lineage, allowing you to identify potentially corrupted or missing data 
>>>>>> in
>>>>>> some cases. You can consider persisting intermediate data results to a
>>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid 
>>>>>> data
>>>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>>>> processing overhead but can ensure data integrity.
>>>>>>
>>>>>> <Prem> How spark will handle these without a checkpoint as it will
>>>>>> slow down the process .  I have data loss or duplication is due to
>>>>>> fetchFailedException as a part of data node reboot.
>>>>>> I have few config to minimize fetchFailedException
>>>>>> spark.network.timeout=300s
>>>>>> spark.reducer.maxReqsInFlight=4
>>>>>> spark.shuffle.io.retryWait=30s
>>>>>> spark.shuffle.io.maxRetries=3
>>>>>>
>>>>>> When we get a fetchFailedException due to data node reboot then spark
>>>>>> should handle it gracefully isn't it ?
>>>>>> or how to handle it ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Your point -> "When Spark job shows FetchFailedException it creates
>>>>>>> few duplicate data and  we see few data also missing , please explain 
>>>>>>> why.
>>>>>>> We have scenario when  spark job complains *FetchFailedException as
>>>>>>> one of the data node got ** rebooted middle of job running ."*
>>>>>>>
>>>>>>> As an engineer I can chip in. You may have partial execution and
>>>>>>> retries  meaning when spark encounters a *FetchFailedException*,
>>>>>>> it  may retry fetching the data from the unavailable (the one being
>>>>>>> rebooted) node a few times before marking it permanently unavailable.
>>>>>>> However, if the rebooted node recovers quickly within this retry window,
>>>>>>> some executors might successfully fetch the data after a retry. *This
>>>>>>> leads to duplicate processing of the same data partition*.
>>>>>>>
>>>>>>> I am not sure this one applies to your spark version but spark may
>>>>>>> speculatively execute tasks on different executors to improve
>>>>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>>>>> speculative task might be launched on another executor. This is where 
>>>>>>> fun
>>>>>>> and games start. If the unavailable node recovers before the speculative
>>>>>>> task finishes, both the original and speculative tasks might complete
>>>>>>> successfully,* resulting in duplicates*. With regard to missing
>>>>>>> data, if the data node reboot leads to data corruption or loss, some 
>>>>>>> data
>>>>>>> partitions might be completely unavailable. In this case, spark may skip
>>>>>>> processing that missing data, leading to missing data in the final 
>>>>>>> output.
>>>>>>>
>>>>>>> Potential remedies: Spark offers some features to mitigate these
>>>>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>>>>> and *spark.speculation* to control retry attempts and speculative
>>>>>>> execution behavior. Lineage tracking is there to help. Spark can track 
>>>>>>> data
>>>>>>> lineage, allowing you to identify potentially corrupted or missing data 
>>>>>>> in
>>>>>>> some cases. You can consider persisting intermediate data results to a
>>>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid 
>>>>>>> data
>>>>>>> loss in case of node failures.  Your mileage varies as it adds 
>>>>>>> additional
>>>>>>> processing overhead but can ensure data integrity.
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>> Mich Talebzadeh,
>>>>>>> Dad | Technologist
>>>>>>> London
>>>>>>> United Kingdom
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>>>> expert opinions (Werner
>>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>>>
>>>>>>>
>>>>>>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <prem.re...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello All,
>>>>>>>> in the list of JIRAs i didn't find anything related to
>>>>>>>> fetchFailedException.
>>>>>>>>
>>>>>>>> as mentioned above
>>>>>>>>
>>>>>>>> "When Spark job shows FetchFailedException it creates few duplicate
>>>>>>>> data and we see few data also missing , please explain why. We have a
>>>>>>>> scenario when spark job complains FetchFailedException as one of the 
>>>>>>>> data
>>>>>>>> nodes got rebooted in the middle of job running .
>>>>>>>> Now due to this we have few duplicate data and few missing data .
>>>>>>>> Why is spark not handling this scenario correctly ? kind of we 
>>>>>>>> shouldn't
>>>>>>>> miss any data and we shouldn't create duplicate data . "
>>>>>>>>
>>>>>>>> We have to rerun the job again to fix this data quality issue .
>>>>>>>> Please let me know why this case is not handled properly by Spark ?
>>>>>>>>
>>>>>>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <
>>>>>>>> dongjoon.h...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Please use the url as thr full string including '()' part.
>>>>>>>>>
>>>>>>>>> Or you can seach directly at ASF Jira with 'Spark' project and
>>>>>>>>> three labels, 'Correctness', 'correctness' and 'data-loss'.
>>>>>>>>>
>>>>>>>>> Dongjoon
>>>>>>>>>
>>>>>>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <prem.re...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Dongjoon,
>>>>>>>>>> Thanks for emailing me.
>>>>>>>>>> Could you please share a list of fixes  as the link provided by
>>>>>>>>>> you is not working.
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <
>>>>>>>>>> dongj...@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> If you are observing correctness issues, you may hit some old
>>>>>>>>>>> (and fixed) correctness issues.
>>>>>>>>>>>
>>>>>>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31
>>>>>>>>>>> correctness issues.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>>>>>>>
>>>>>>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>>>>>>>
>>>>>>>>>>> Please use the latest version, Apache Spark 3.5.1, because
>>>>>>>>>>> Apache Spark 3.2 and 3.3 are in the End-Of-Support status of the 
>>>>>>>>>>> community.
>>>>>>>>>>>
>>>>>>>>>>> It would be help if you can report any correctness issues with
>>>>>>>>>>> Apache Spark 3.5.1.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Dongjoon.
>>>>>>>>>>>
>>>>>>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>>>>>>>> > When Spark job shows FetchFailedException it creates few
>>>>>>>>>>> duplicate data and
>>>>>>>>>>> > we see few data also missing , please explain why. We have
>>>>>>>>>>> scenario when
>>>>>>>>>>> > spark job complains FetchFailedException as one of the data
>>>>>>>>>>> node got
>>>>>>>>>>> > rebooted middle of job running .
>>>>>>>>>>> >
>>>>>>>>>>> > Now due to this we have few duplicate data and few missing
>>>>>>>>>>> data . Why spark
>>>>>>>>>>> > is not handling this scenario correctly ? kind of we shouldn't
>>>>>>>>>>> miss any
>>>>>>>>>>> > data and we shouldn't create duplicate data .
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > I am using spark3.2.0 version.
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to