Hello Mich,
thanks for your reply.

As an engineer I can chip in. You may have partial execution and retries
meaning when spark encounters a *FetchFailedException*, it  may retry
fetching the data from the unavailable (the one being rebooted) node a few
times before marking it permanently unavailable. However, if the rebooted
node recovers quickly within this retry window, some executors might
successfully fetch the data after a retry. *This leads to duplicate
processing of the same data partition*.

<Prem> data node reboot is taking more than 20 mins and our config
spark.network.timeout=300s so we don't have dupls for the above reason.
I am not sure this one applies to your spark version but spark may
speculatively execute tasks on different executors to improve
performance. If a task fails due to the *FetchFailedException*, a
speculative task might be launched on another executor. This is where fun
and games start. If the unavailable node recovers before the speculative
task finishes, both the original and speculative tasks might complete
successfully,* resulting in duplicates*. With regard to missing data, if
the data node reboot leads to data corruption or loss, some data partitions
might be completely unavailable. In this case, spark may skip processing
that missing data, leading to missing data in the final output.

<Prem> I think when a task failed in between  and retry task started and
completed it may create duplicate as failed task has some data + retry task
has  full data.  but my question is why spark keeps delta data or
according to you if speculative and original task completes generally spark
kills one of the tasks to get rid of dups data.  when a data node is
rebooted then spark fault tolerant should go to other nodes isn't it ? then
why it has missing data.
Potential remedies: Spark offers some features to mitigate these issues,
but it might not guarantee complete elimination of duplicates or data
loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
*spark.speculation* to control retry attempts and speculative execution
behavior. Lineage tracking is there to help. Spark can track data lineage,
allowing you to identify potentially corrupted or missing data in some
cases. You can consider persisting intermediate data results to a reliable
storage (like HDFS or GCS or another cloud storage) to avoid data loss in
case of node failures.  Your mileage varies as it adds additional
processing overhead but can ensure data integrity.

<Prem> How spark will handle these without a checkpoint as it will slow
down the process .  I have data loss or duplication is due to
fetchFailedException as a part of data node reboot.
I have few config to minimize fetchFailedException
spark.network.timeout=300s
spark.reducer.maxReqsInFlight=4
spark.shuffle.io.retryWait=30s
spark.shuffle.io.maxRetries=3

When we get a fetchFailedException due to data node reboot then spark
should handle it gracefully isn't it ?
or how to handle it ?





On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> Your point -> "When Spark job shows FetchFailedException it creates few
> duplicate data and  we see few data also missing , please explain why. We
> have scenario when  spark job complains *FetchFailedException as one of
> the data node got ** rebooted middle of job running ."*
>
> As an engineer I can chip in. You may have partial execution and retries
> meaning when spark encounters a *FetchFailedException*, it  may retry
> fetching the data from the unavailable (the one being rebooted) node a few
> times before marking it permanently unavailable. However, if the rebooted
> node recovers quickly within this retry window, some executors might
> successfully fetch the data after a retry. *This leads to duplicate
> processing of the same data partition*.
>
> I am not sure this one applies to your spark version but spark may
> speculatively execute tasks on different executors to improve
> performance. If a task fails due to the *FetchFailedException*, a
> speculative task might be launched on another executor. This is where fun
> and games start. If the unavailable node recovers before the speculative
> task finishes, both the original and speculative tasks might complete
> successfully,* resulting in duplicates*. With regard to missing data, if
> the data node reboot leads to data corruption or loss, some data partitions
> might be completely unavailable. In this case, spark may skip processing
> that missing data, leading to missing data in the final output.
>
> Potential remedies: Spark offers some features to mitigate these issues,
> but it might not guarantee complete elimination of duplicates or data
> loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
> *spark.speculation* to control retry attempts and speculative execution
> behavior. Lineage tracking is there to help. Spark can track data lineage,
> allowing you to identify potentially corrupted or missing data in some
> cases. You can consider persisting intermediate data results to a reliable
> storage (like HDFS or GCS or another cloud storage) to avoid data loss in
> case of node failures.  Your mileage varies as it adds additional
> processing overhead but can ensure data integrity.
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <prem.re...@gmail.com> wrote:
>
>> Hello All,
>> in the list of JIRAs i didn't find anything related to
>> fetchFailedException.
>>
>> as mentioned above
>>
>> "When Spark job shows FetchFailedException it creates few duplicate data
>> and we see few data also missing , please explain why. We have a scenario
>> when spark job complains FetchFailedException as one of the data nodes got
>> rebooted in the middle of job running .
>> Now due to this we have few duplicate data and few missing data . Why is
>> spark not handling this scenario correctly ? kind of we shouldn't miss any
>> data and we shouldn't create duplicate data . "
>>
>> We have to rerun the job again to fix this data quality issue . Please
>> let me know why this case is not handled properly by Spark ?
>>
>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <dongjoon.h...@gmail.com>
>> wrote:
>>
>>> Please use the url as thr full string including '()' part.
>>>
>>> Or you can seach directly at ASF Jira with 'Spark' project and three
>>> labels, 'Correctness', 'correctness' and 'data-loss'.
>>>
>>> Dongjoon
>>>
>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <prem.re...@gmail.com> wrote:
>>>
>>>> Hello Dongjoon,
>>>> Thanks for emailing me.
>>>> Could you please share a list of fixes  as the link provided by you is
>>>> not working.
>>>>
>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <dongj...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> If you are observing correctness issues, you may hit some old (and
>>>>> fixed) correctness issues.
>>>>>
>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31 correctness
>>>>> issues.
>>>>>
>>>>>
>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>
>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>
>>>>> Please use the latest version, Apache Spark 3.5.1, because Apache
>>>>> Spark 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>>
>>>>> It would be help if you can report any correctness issues with Apache
>>>>> Spark 3.5.1.
>>>>>
>>>>> Thanks,
>>>>> Dongjoon.
>>>>>
>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>> > When Spark job shows FetchFailedException it creates few duplicate
>>>>> data and
>>>>> > we see few data also missing , please explain why. We have scenario
>>>>> when
>>>>> > spark job complains FetchFailedException as one of the data node got
>>>>> > rebooted middle of job running .
>>>>> >
>>>>> > Now due to this we have few duplicate data and few missing data .
>>>>> Why spark
>>>>> > is not handling this scenario correctly ? kind of we shouldn't miss
>>>>> any
>>>>> > data and we shouldn't create duplicate data .
>>>>> >
>>>>> >
>>>>> >
>>>>> > I am using spark3.2.0 version.
>>>>> >
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>
>>>>>

Reply via email to