thanks Mich, in a nutshell  if fetchFailedException occurs due to data node
reboot then it  can create duplicate / missing data  .   so this is more of
hardware(env issue ) rather than spark issue .



On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> It seems to me that there are issues related to below
>
> *<Prem> I think when a task failed in between  and retry task started and
> completed it may create duplicate as failed task has some data + retry task
> has  full data.  but my question is why spark keeps delta data or
> according to you if speculative and original task completes generally spark
> kills one of the tasks to get rid of dups data.  when a data node is
> rebooted then spark fault tolerant should go to other nodes isn't it ? then
> why it has missing data.*
>
> Spark is designed to be fault-tolerant through lineage and recomputation.
> However, there are scenarios where speculative execution or task retries
> might lead to duplicated or missing data. So what are these?
>
> - Task Failure and Retry: You are correct that a failed task might have
> processed some data before encountering the FetchFailedException. If a
> retry succeeds, it would process the entire data partition again, leading
> to duplicates. When a task fails, Spark may recompute the lost data by
> recomputing the lost task on another node.  The output of the retried task
> is typically combined with the output of the original task during the final
> stage of the computation. This combination is done to handle scenarios
> where the original task partially completed and generated some output
> before failing. Spark does not intentionally store partially processed
> data. However, due to retries and speculative execution, duplicate
> processing can occur. To the best of my knowledge, Spark itself doesn't
> have a mechanism to identify and eliminate duplicates automatically. While
> Spark might sometimes kill speculative tasks if the original one finishes,
> it is not a guaranteed behavior. This depends on various factors like
> scheduling and task dependencies.
>
> - Speculative Execution: Spark supports speculative execution, where the
> same task is launched on multiple executors simultaneously. The result of
> the first completed task is used, and the others are usually killed to
> avoid duplicated results. However, speculative execution might introduce
> some duplication in the final output if tasks on different executors
> complete successfully.
>
> - Node Reboots and Fault Tolerance: If the data node reboot leads to data
> corruption or loss, that data might be unavailable to Spark. Even with
> fault tolerance, Spark cannot recover completely missing data. Fault
> tolerance focuses on recovering from issues like executor failures, not
> data loss on storage nodes. Overall, Spark's fault tolerance is designed to
> handle executor failures by rescheduling tasks on other available executors
> and temporary network issues by retrying fetches based on configuration.
>
> Here are some stuff to consider:
>
> - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower value
> such as  1 or 2 to reduce the chance of duplicate processing attempts, if
> retries are suspected to be a source.
> - Disable speculative execution if needed: Consider disabling speculative
> execution (spark.speculation=false) if duplicates are a major concern.
> However, this might impact performance.
> - Data persistence: As mentioned in the previous reply, persist
> intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity
> is critical. This ensures data availability even during node failures.
> - Data validation checks: Implement data validation checks after
> processing to identify potential duplicates or missing data.
> HTH
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <prem.re...@gmail.com> wrote:
>
>> Hello Mich,
>> thanks for your reply.
>>
>> As an engineer I can chip in. You may have partial execution and retries
>> meaning when spark encounters a *FetchFailedException*, it  may retry
>> fetching the data from the unavailable (the one being rebooted) node a few
>> times before marking it permanently unavailable. However, if the rebooted
>> node recovers quickly within this retry window, some executors might
>> successfully fetch the data after a retry. *This leads to duplicate
>> processing of the same data partition*.
>>
>> <Prem> data node reboot is taking more than 20 mins and our config
>> spark.network.timeout=300s so we don't have dupls for the above reason.
>> I am not sure this one applies to your spark version but spark may
>> speculatively execute tasks on different executors to improve
>> performance. If a task fails due to the *FetchFailedException*, a
>> speculative task might be launched on another executor. This is where fun
>> and games start. If the unavailable node recovers before the speculative
>> task finishes, both the original and speculative tasks might complete
>> successfully,* resulting in duplicates*. With regard to missing data, if
>> the data node reboot leads to data corruption or loss, some data partitions
>> might be completely unavailable. In this case, spark may skip processing
>> that missing data, leading to missing data in the final output.
>>
>> <Prem> I think when a task failed in between  and retry task started and
>> completed it may create duplicate as failed task has some data + retry task
>> has  full data.  but my question is why spark keeps delta data or
>> according to you if speculative and original task completes generally spark
>> kills one of the tasks to get rid of dups data.  when a data node is
>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>> why it has missing data.
>> Potential remedies: Spark offers some features to mitigate these issues,
>> but it might not guarantee complete elimination of duplicates or data
>> loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
>> *spark.speculation* to control retry attempts and speculative execution
>> behavior. Lineage tracking is there to help. Spark can track data lineage,
>> allowing you to identify potentially corrupted or missing data in some
>> cases. You can consider persisting intermediate data results to a reliable
>> storage (like HDFS or GCS or another cloud storage) to avoid data loss in
>> case of node failures.  Your mileage varies as it adds additional
>> processing overhead but can ensure data integrity.
>>
>> <Prem> How spark will handle these without a checkpoint as it will slow
>> down the process .  I have data loss or duplication is due to
>> fetchFailedException as a part of data node reboot.
>> I have few config to minimize fetchFailedException
>> spark.network.timeout=300s
>> spark.reducer.maxReqsInFlight=4
>> spark.shuffle.io.retryWait=30s
>> spark.shuffle.io.maxRetries=3
>>
>> When we get a fetchFailedException due to data node reboot then spark
>> should handle it gracefully isn't it ?
>> or how to handle it ?
>>
>>
>>
>>
>>
>> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Your point -> "When Spark job shows FetchFailedException it creates few
>>> duplicate data and  we see few data also missing , please explain why. We
>>> have scenario when  spark job complains *FetchFailedException as one of
>>> the data node got ** rebooted middle of job running ."*
>>>
>>> As an engineer I can chip in. You may have partial execution and
>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>> may retry fetching the data from the unavailable (the one being rebooted)
>>> node a few times before marking it permanently unavailable. However, if the
>>> rebooted node recovers quickly within this retry window, some executors
>>> might successfully fetch the data after a retry. *This leads to
>>> duplicate processing of the same data partition*.
>>>
>>> I am not sure this one applies to your spark version but spark may
>>> speculatively execute tasks on different executors to improve
>>> performance. If a task fails due to the *FetchFailedException*, a
>>> speculative task might be launched on another executor. This is where fun
>>> and games start. If the unavailable node recovers before the speculative
>>> task finishes, both the original and speculative tasks might complete
>>> successfully,* resulting in duplicates*. With regard to missing data,
>>> if the data node reboot leads to data corruption or loss, some data
>>> partitions might be completely unavailable. In this case, spark may skip
>>> processing that missing data, leading to missing data in the final output.
>>>
>>> Potential remedies: Spark offers some features to mitigate these issues,
>>> but it might not guarantee complete elimination of duplicates or data
>>> loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
>>> *spark.speculation* to control retry attempts and speculative execution
>>> behavior. Lineage tracking is there to help. Spark can track data lineage,
>>> allowing you to identify potentially corrupted or missing data in some
>>> cases. You can consider persisting intermediate data results to a reliable
>>> storage (like HDFS or GCS or another cloud storage) to avoid data loss in
>>> case of node failures.  Your mileage varies as it adds additional
>>> processing overhead but can ensure data integrity.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <prem.re...@gmail.com> wrote:
>>>
>>>> Hello All,
>>>> in the list of JIRAs i didn't find anything related to
>>>> fetchFailedException.
>>>>
>>>> as mentioned above
>>>>
>>>> "When Spark job shows FetchFailedException it creates few duplicate
>>>> data and we see few data also missing , please explain why. We have a
>>>> scenario when spark job complains FetchFailedException as one of the data
>>>> nodes got rebooted in the middle of job running .
>>>> Now due to this we have few duplicate data and few missing data . Why
>>>> is spark not handling this scenario correctly ? kind of we shouldn't miss
>>>> any data and we shouldn't create duplicate data . "
>>>>
>>>> We have to rerun the job again to fix this data quality issue . Please
>>>> let me know why this case is not handled properly by Spark ?
>>>>
>>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <dongjoon.h...@gmail.com>
>>>> wrote:
>>>>
>>>>> Please use the url as thr full string including '()' part.
>>>>>
>>>>> Or you can seach directly at ASF Jira with 'Spark' project and three
>>>>> labels, 'Correctness', 'correctness' and 'data-loss'.
>>>>>
>>>>> Dongjoon
>>>>>
>>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <prem.re...@gmail.com> wrote:
>>>>>
>>>>>> Hello Dongjoon,
>>>>>> Thanks for emailing me.
>>>>>> Could you please share a list of fixes  as the link provided by you
>>>>>> is not working.
>>>>>>
>>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <dongj...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> If you are observing correctness issues, you may hit some old (and
>>>>>>> fixed) correctness issues.
>>>>>>>
>>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31
>>>>>>> correctness issues.
>>>>>>>
>>>>>>>
>>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>>>
>>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>>>
>>>>>>> Please use the latest version, Apache Spark 3.5.1, because Apache
>>>>>>> Spark 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>>>>
>>>>>>> It would be help if you can report any correctness issues with
>>>>>>> Apache Spark 3.5.1.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Dongjoon.
>>>>>>>
>>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>>>> > When Spark job shows FetchFailedException it creates few duplicate
>>>>>>> data and
>>>>>>> > we see few data also missing , please explain why. We have
>>>>>>> scenario when
>>>>>>> > spark job complains FetchFailedException as one of the data node
>>>>>>> got
>>>>>>> > rebooted middle of job running .
>>>>>>> >
>>>>>>> > Now due to this we have few duplicate data and few missing data .
>>>>>>> Why spark
>>>>>>> > is not handling this scenario correctly ? kind of we shouldn't
>>>>>>> miss any
>>>>>>> > data and we shouldn't create duplicate data .
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > I am using spark3.2.0 version.
>>>>>>> >
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>>
>>>>>>>

Reply via email to