super :(

On Mon, Mar 4, 2024 at 6:19 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> "... in a nutshell  if fetchFailedException occurs due to data node reboot
> then it  can create duplicate / missing data  .   so this is more of
> hardware(env issue ) rather than spark issue ."
>
> As an overall conclusion your point is correct but again the answer is not
> binary.
>
> Spark core relies on a distributed file system to store data across data
> nodes. When Spark needs to process data, it fetches the required blocks
> from the data nodes.* FetchFailedException*: means  that Spark
> encountered an error while fetching data blocks from a data node. If a data
> node reboots unexpectedly, it becomes unavailable to Spark for a
> period. During this time, Spark might attempt to fetch data blocks from the
> unavailable node, resulting in the FetchFailedException.. Depending on the
> timing and nature of the reboot and data access, this exception can lead
> to:the following:
>
>    - Duplicate Data: If Spark retries the fetch operation successfully
>    after the reboot, it might end up processing the same data twice, leading
>    to duplicates.
>    - Missing Data: If Spark cannot fetch all required data blocks due to
>    the unavailable data node, some data might be missing from the processing
>    results.
>
> The root cause of this issue lies in the data node reboot itself. So we
> can conclude that it is not a  problem with Spark core functionality but
> rather an environmental issue within the distributed storage systemL  You
> need to ensure that your nodes are stable and minimise unexpected reboots
> for whatever reason. Look at the host logs  or run /usr/bin/dmesg to see
> what happened..
>
> Good luck
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Mon, 4 Mar 2024 at 01:30, Prem Sahoo <prem.re...@gmail.com> wrote:
>
>> thanks Mich, in a nutshell  if fetchFailedException occurs due to data
>> node reboot then it  can create duplicate / missing data  .   so this is
>> more of hardware(env issue ) rather than spark issue .
>>
>>
>>
>> On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> It seems to me that there are issues related to below
>>>
>>> *<Prem> I think when a task failed in between  and retry task started
>>> and completed it may create duplicate as failed task has some data + retry
>>> task has  full data.  but my question is why spark keeps delta data or
>>> according to you if speculative and original task completes generally spark
>>> kills one of the tasks to get rid of dups data.  when a data node is
>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>>> why it has missing data.*
>>>
>>> Spark is designed to be fault-tolerant through lineage and
>>> recomputation. However, there are scenarios where speculative execution or
>>> task retries might lead to duplicated or missing data. So what are these?
>>>
>>> - Task Failure and Retry: You are correct that a failed task might have
>>> processed some data before encountering the FetchFailedException. If a
>>> retry succeeds, it would process the entire data partition again, leading
>>> to duplicates. When a task fails, Spark may recompute the lost data by
>>> recomputing the lost task on another node.  The output of the retried task
>>> is typically combined with the output of the original task during the final
>>> stage of the computation. This combination is done to handle scenarios
>>> where the original task partially completed and generated some output
>>> before failing. Spark does not intentionally store partially processed
>>> data. However, due to retries and speculative execution, duplicate
>>> processing can occur. To the best of my knowledge, Spark itself doesn't
>>> have a mechanism to identify and eliminate duplicates automatically. While
>>> Spark might sometimes kill speculative tasks if the original one finishes,
>>> it is not a guaranteed behavior. This depends on various factors like
>>> scheduling and task dependencies.
>>>
>>> - Speculative Execution: Spark supports speculative execution, where the
>>> same task is launched on multiple executors simultaneously. The result of
>>> the first completed task is used, and the others are usually killed to
>>> avoid duplicated results. However, speculative execution might introduce
>>> some duplication in the final output if tasks on different executors
>>> complete successfully.
>>>
>>> - Node Reboots and Fault Tolerance: If the data node reboot leads to
>>> data corruption or loss, that data might be unavailable to Spark. Even with
>>> fault tolerance, Spark cannot recover completely missing data. Fault
>>> tolerance focuses on recovering from issues like executor failures, not
>>> data loss on storage nodes. Overall, Spark's fault tolerance is designed to
>>> handle executor failures by rescheduling tasks on other available executors
>>> and temporary network issues by retrying fetches based on configuration.
>>>
>>> Here are some stuff to consider:
>>>
>>> - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower value
>>> such as  1 or 2 to reduce the chance of duplicate processing attempts, if
>>> retries are suspected to be a source.
>>> - Disable speculative execution if needed: Consider disabling
>>> speculative execution (spark.speculation=false) if duplicates are a major
>>> concern. However, this might impact performance.
>>> - Data persistence: As mentioned in the previous reply, persist
>>> intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity
>>> is critical. This ensures data availability even during node failures.
>>> - Data validation checks: Implement data validation checks after
>>> processing to identify potential duplicates or missing data.
>>> HTH
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <prem.re...@gmail.com> wrote:
>>>
>>>> Hello Mich,
>>>> thanks for your reply.
>>>>
>>>> As an engineer I can chip in. You may have partial execution and
>>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>>> may retry fetching the data from the unavailable (the one being rebooted)
>>>> node a few times before marking it permanently unavailable. However, if the
>>>> rebooted node recovers quickly within this retry window, some executors
>>>> might successfully fetch the data after a retry. *This leads to
>>>> duplicate processing of the same data partition*.
>>>>
>>>> <Prem> data node reboot is taking more than 20 mins and our config
>>>> spark.network.timeout=300s so we don't have dupls for the above reason.
>>>> I am not sure this one applies to your spark version but spark may
>>>> speculatively execute tasks on different executors to improve
>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>> speculative task might be launched on another executor. This is where fun
>>>> and games start. If the unavailable node recovers before the speculative
>>>> task finishes, both the original and speculative tasks might complete
>>>> successfully,* resulting in duplicates*. With regard to missing data,
>>>> if the data node reboot leads to data corruption or loss, some data
>>>> partitions might be completely unavailable. In this case, spark may skip
>>>> processing that missing data, leading to missing data in the final output.
>>>>
>>>> <Prem> I think when a task failed in between  and retry task started
>>>> and completed it may create duplicate as failed task has some data + retry
>>>> task has  full data.  but my question is why spark keeps delta data or
>>>> according to you if speculative and original task completes generally spark
>>>> kills one of the tasks to get rid of dups data.  when a data node is
>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>>>> why it has missing data.
>>>> Potential remedies: Spark offers some features to mitigate these
>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>> and *spark.speculation* to control retry attempts and speculative
>>>> execution behavior. Lineage tracking is there to help. Spark can track data
>>>> lineage, allowing you to identify potentially corrupted or missing data in
>>>> some cases. You can consider persisting intermediate data results to a
>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data
>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>> processing overhead but can ensure data integrity.
>>>>
>>>> <Prem> How spark will handle these without a checkpoint as it will slow
>>>> down the process .  I have data loss or duplication is due to
>>>> fetchFailedException as a part of data node reboot.
>>>> I have few config to minimize fetchFailedException
>>>> spark.network.timeout=300s
>>>> spark.reducer.maxReqsInFlight=4
>>>> spark.shuffle.io.retryWait=30s
>>>> spark.shuffle.io.maxRetries=3
>>>>
>>>> When we get a fetchFailedException due to data node reboot then spark
>>>> should handle it gracefully isn't it ?
>>>> or how to handle it ?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Your point -> "When Spark job shows FetchFailedException it creates
>>>>> few duplicate data and  we see few data also missing , please explain why.
>>>>> We have scenario when  spark job complains *FetchFailedException as
>>>>> one of the data node got ** rebooted middle of job running ."*
>>>>>
>>>>> As an engineer I can chip in. You may have partial execution and
>>>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>>>> may retry fetching the data from the unavailable (the one being rebooted)
>>>>> node a few times before marking it permanently unavailable. However, if 
>>>>> the
>>>>> rebooted node recovers quickly within this retry window, some executors
>>>>> might successfully fetch the data after a retry. *This leads to
>>>>> duplicate processing of the same data partition*.
>>>>>
>>>>> I am not sure this one applies to your spark version but spark may
>>>>> speculatively execute tasks on different executors to improve
>>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>>> speculative task might be launched on another executor. This is where fun
>>>>> and games start. If the unavailable node recovers before the speculative
>>>>> task finishes, both the original and speculative tasks might complete
>>>>> successfully,* resulting in duplicates*. With regard to missing data,
>>>>> if the data node reboot leads to data corruption or loss, some data
>>>>> partitions might be completely unavailable. In this case, spark may skip
>>>>> processing that missing data, leading to missing data in the final output.
>>>>>
>>>>> Potential remedies: Spark offers some features to mitigate these
>>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>>> and *spark.speculation* to control retry attempts and speculative
>>>>> execution behavior. Lineage tracking is there to help. Spark can track 
>>>>> data
>>>>> lineage, allowing you to identify potentially corrupted or missing data in
>>>>> some cases. You can consider persisting intermediate data results to a
>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data
>>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>>> processing overhead but can ensure data integrity.
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Dad | Technologist
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>> expert opinions (Werner
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>
>>>>>
>>>>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <prem.re...@gmail.com> wrote:
>>>>>
>>>>>> Hello All,
>>>>>> in the list of JIRAs i didn't find anything related to
>>>>>> fetchFailedException.
>>>>>>
>>>>>> as mentioned above
>>>>>>
>>>>>> "When Spark job shows FetchFailedException it creates few duplicate
>>>>>> data and we see few data also missing , please explain why. We have a
>>>>>> scenario when spark job complains FetchFailedException as one of the data
>>>>>> nodes got rebooted in the middle of job running .
>>>>>> Now due to this we have few duplicate data and few missing data . Why
>>>>>> is spark not handling this scenario correctly ? kind of we shouldn't miss
>>>>>> any data and we shouldn't create duplicate data . "
>>>>>>
>>>>>> We have to rerun the job again to fix this data quality issue .
>>>>>> Please let me know why this case is not handled properly by Spark ?
>>>>>>
>>>>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <
>>>>>> dongjoon.h...@gmail.com> wrote:
>>>>>>
>>>>>>> Please use the url as thr full string including '()' part.
>>>>>>>
>>>>>>> Or you can seach directly at ASF Jira with 'Spark' project and three
>>>>>>> labels, 'Correctness', 'correctness' and 'data-loss'.
>>>>>>>
>>>>>>> Dongjoon
>>>>>>>
>>>>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <prem.re...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Dongjoon,
>>>>>>>> Thanks for emailing me.
>>>>>>>> Could you please share a list of fixes  as the link provided by you
>>>>>>>> is not working.
>>>>>>>>
>>>>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <dongj...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> If you are observing correctness issues, you may hit some old (and
>>>>>>>>> fixed) correctness issues.
>>>>>>>>>
>>>>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31
>>>>>>>>> correctness issues.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>>>>>
>>>>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>>>>>
>>>>>>>>> Please use the latest version, Apache Spark 3.5.1, because Apache
>>>>>>>>> Spark 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>>>>>>
>>>>>>>>> It would be help if you can report any correctness issues with
>>>>>>>>> Apache Spark 3.5.1.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Dongjoon.
>>>>>>>>>
>>>>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>>>>>> > When Spark job shows FetchFailedException it creates few
>>>>>>>>> duplicate data and
>>>>>>>>> > we see few data also missing , please explain why. We have
>>>>>>>>> scenario when
>>>>>>>>> > spark job complains FetchFailedException as one of the data node
>>>>>>>>> got
>>>>>>>>> > rebooted middle of job running .
>>>>>>>>> >
>>>>>>>>> > Now due to this we have few duplicate data and few missing data
>>>>>>>>> . Why spark
>>>>>>>>> > is not handling this scenario correctly ? kind of we shouldn't
>>>>>>>>> miss any
>>>>>>>>> > data and we shouldn't create duplicate data .
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > I am using spark3.2.0 version.
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>>>>
>>>>>>>>>

Reply via email to