Re: [DISCUSS] SPIP: Structured Spark Logging

2024-03-02 Thread Mich Talebzadeh
Hi Gengliang,

Thanks for taking the initiative to improve the Spark logging system.
Transitioning to structured logs seems like a worthy way to enhance the
ability to analyze and troubleshoot Spark jobs and hopefully  the future
integration with cloud logging systems. While "Structured Spark Logging"
sounds good, I was wondering if we could consider an alternative name.
Since we already use "Spark Structured Streaming", there might be a slight
initial confusion with the terminology. I must confess it was my initial
reaction so to speak.

Here are a few alternative names I came up with if I may

   - Spark Log Schema Initiative
   - Centralized Logging with Structured Data for Spark
   - Enhanced Spark Logging with Queryable Format

These options all highlight the key aspects of your proposal namely;
schema, centralized logging and queryability and might be even clearer for
everyone at first glance.

Cheers

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Fri, 1 Mar 2024 at 10:07, Gengliang Wang  wrote:

> Hi All,
>
> I propose to enhance our logging system by transitioning to structured
> logs. This initiative is designed to tackle the challenges of analyzing
> distributed logs from drivers, workers, and executors by allowing them to
> be queried using a fixed schema. The goal is to improve the informativeness
> and accessibility of logs, making it significantly easier to diagnose
> issues.
>
> Key benefits include:
>
>- Clarity and queryability of distributed log files.
>- Continued support for log4j, allowing users to switch back to
>traditional text logging if preferred.
>
> The improvement will simplify debugging and enhance productivity without
> disrupting existing logging practices. The implementation is estimated to
> take around 3 months.
>
> *SPIP*:
> https://docs.google.com/document/d/1rATVGmFLNVLmtxSpWrEceYm7d-ocgu8ofhryVs4g3XU/edit?usp=sharing
> *JIRA*: SPARK-47240 
>
> Your comments and feedback would be greatly appreciated.
>


Re: [DISCUSS] SPIP: Structured Spark Logging

2024-03-02 Thread Mridul Muralidharan
Hi Gengling,

  Thanks for sharing this !
I added a few queries to the proposal doc, and we can continue discussing
there, but overall I am in favor of this.

Regards,
Mridul


On Fri, Mar 1, 2024 at 1:35 AM Gengliang Wang  wrote:

> Hi All,
>
> I propose to enhance our logging system by transitioning to structured
> logs. This initiative is designed to tackle the challenges of analyzing
> distributed logs from drivers, workers, and executors by allowing them to
> be queried using a fixed schema. The goal is to improve the informativeness
> and accessibility of logs, making it significantly easier to diagnose
> issues.
>
> Key benefits include:
>
>- Clarity and queryability of distributed log files.
>- Continued support for log4j, allowing users to switch back to
>traditional text logging if preferred.
>
> The improvement will simplify debugging and enhance productivity without
> disrupting existing logging practices. The implementation is estimated to
> take around 3 months.
>
> *SPIP*:
> https://docs.google.com/document/d/1rATVGmFLNVLmtxSpWrEceYm7d-ocgu8ofhryVs4g3XU/edit?usp=sharing
> *JIRA*: SPARK-47240 
>
> Your comments and feedback would be greatly appreciated.
>


Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

2024-03-02 Thread Mich Talebzadeh
Hi,

It seems to me that there are issues related to below

* I think when a task failed in between  and retry task started and
completed it may create duplicate as failed task has some data + retry task
has  full data.  but my question is why spark keeps delta data or
according to you if speculative and original task completes generally spark
kills one of the tasks to get rid of dups data.  when a data node is
rebooted then spark fault tolerant should go to other nodes isn't it ? then
why it has missing data.*

Spark is designed to be fault-tolerant through lineage and recomputation.
However, there are scenarios where speculative execution or task retries
might lead to duplicated or missing data. So what are these?

- Task Failure and Retry: You are correct that a failed task might have
processed some data before encountering the FetchFailedException. If a
retry succeeds, it would process the entire data partition again, leading
to duplicates. When a task fails, Spark may recompute the lost data by
recomputing the lost task on another node.  The output of the retried task
is typically combined with the output of the original task during the final
stage of the computation. This combination is done to handle scenarios
where the original task partially completed and generated some output
before failing. Spark does not intentionally store partially processed
data. However, due to retries and speculative execution, duplicate
processing can occur. To the best of my knowledge, Spark itself doesn't
have a mechanism to identify and eliminate duplicates automatically. While
Spark might sometimes kill speculative tasks if the original one finishes,
it is not a guaranteed behavior. This depends on various factors like
scheduling and task dependencies.

- Speculative Execution: Spark supports speculative execution, where the
same task is launched on multiple executors simultaneously. The result of
the first completed task is used, and the others are usually killed to
avoid duplicated results. However, speculative execution might introduce
some duplication in the final output if tasks on different executors
complete successfully.

- Node Reboots and Fault Tolerance: If the data node reboot leads to data
corruption or loss, that data might be unavailable to Spark. Even with
fault tolerance, Spark cannot recover completely missing data. Fault
tolerance focuses on recovering from issues like executor failures, not
data loss on storage nodes. Overall, Spark's fault tolerance is designed to
handle executor failures by rescheduling tasks on other available executors
and temporary network issues by retrying fetches based on configuration.

Here are some stuff to consider:

- Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower value
such as  1 or 2 to reduce the chance of duplicate processing attempts, if
retries are suspected to be a source.
- Disable speculative execution if needed: Consider disabling speculative
execution (spark.speculation=false) if duplicates are a major concern.
However, this might impact performance.
- Data persistence: As mentioned in the previous reply, persist
intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity
is critical. This ensures data availability even during node failures.
- Data validation checks: Implement data validation checks after processing
to identify potential duplicates or missing data.
HTH
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sat, 2 Mar 2024 at 01:43, Prem Sahoo  wrote:

> Hello Mich,
> thanks for your reply.
>
> As an engineer I can chip in. You may have partial execution and retries
> meaning when spark encounters a *FetchFailedException*, it  may retry
> fetching the data from the unavailable (the one being rebooted) node a few
> times before marking it permanently unavailable. However, if the rebooted
> node recovers quickly within this retry window, some executors might
> successfully fetch the data after a retry. *This leads to duplicate
> processing of the same data partition*.
>
>  data node reboot is taking more than 20 mins and our config
> spark.network.timeout=300s so we don't have dupls for the above reason.
> I am not sure this one applies to your spark version but spark may
> speculatively execute tasks on different executors to improve
> performance. If a task fails due to the *FetchFailedException*, a
> speculative task might be launched on another executor. This is where fun
> and games start.