[ 
https://issues.apache.org/jira/browse/SPARK-46706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joey Pereira updated SPARK-46706:
---------------------------------
    Attachment: Screenshot 2024-01-05 at 3.51.52 PM.png

> percentile_approx regression between at least Spark 3.1
> -------------------------------------------------------
>
>                 Key: SPARK-46706
>                 URL: https://issues.apache.org/jira/browse/SPARK-46706
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, SQL
>    Affects Versions: 3.1.3
>            Reporter: Joey Pereira
>            Priority: Major
>         Attachments: Screenshot 2024-01-05 at 3.51.52 PM.png, Screenshot 
> 2024-01-05 at 3.53.10 PM.png
>
>
> Originally reported in SPARK-40499, I'm opening a new issue as that original 
> one was closed. It's not entirely clear if this regression exists on HEAD, 
> but I haven't seen any changes that I would expect to address this. Here's 
> what I reported in the other ticket.
> ---
> Chiming in, I'm currently hitting a similar issue while doing upgrades from 
> Spark 2.4 to Spark 3.1.
> In our case, we have an application with two queries that take 
> {{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
> (reading 37 billion rows and aggregating to ~100 and ~7000 rows, with 
> different groupBy keys respectively). This roughly what the query looks like:
> {code:java}
> select
>     col1,
>     percentile_approx(col2, 0.95) as q_95,
>     percentile_approx(col2, 0.99) as q_99,
>     percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
> from data
> group by 1
> {code}
> Also, I do recognize this could be written more optimally as 
> {{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this 
> is just the snippet as it was when I stumbled on it :)
> We've observed that the shuffle write stage is roughly the same time (and 
> size) but the shuffle read is drastically slower. Note, in this application 
> the shuffle read is also collecting for a broadcast. The difference for the 
> shuffle read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and 
> that was after reducing the {{percentile_approx}} precision from the default 
> (10000) to 100 on Spark 3.1. When left at 10000, the stage was exceeding 15 
> hours and hitting some of our application time limits.
> *Spark 2.4*
> !Screenshot 2024-01-05 at 3.53.10 PM.png!
>  - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
>  - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size
> *Spark 3.1*
> !Screenshot 2024-01-05 at 3.51.52 PM.png!
>  - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
> 20.3 GiB shuffle write size
>  - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size
> Prior to digging further into this, I thought it may have been some AQE 
> issue. That was incorrect and in fact the post-aggregation task/partition 
> count was much worse without AQE, at about 150000.
> I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
> since 3.1, to see if there's anything significant. I just looked at PRs 
> against {{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, 
> and ignored some docs, formatting, refactoring, or very simple behavioural 
> changes. These are the notable changes with some commentary.
> For changes between 2.4 and 3.1:
>  # 
> [https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
>  which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
> This seems potentially relevant, especially since the shuffle reading which 
> is likely just executing merging is struggling. That being said, I would have 
> maybe expected this to also impact the partial_percentile_approx calculations 
> too.
>  # 
> [https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
>  -appears to change the behaviour in evaluation for {{case when}} / 
> conditional inputs. This is possibly relevant, but our SQL code only plumbs a 
> primitive in. Our actual job is a little more complex and the SQL is over a 
> subquery, but the subquery is not manipulating the column using for 
> {{{}percentile_approx{}}}.- I misunderstood this change initially. It looks 
> like an optimization to avoid reconstructing the data type for some parent 
> plan nodes. Our SQL code does consume percentiles in some CaseWhen clauses, 
> so I'll need to dig more into this.
> I didn't go far while looking into other code. I did not see relevant changes 
> in {{{}AggregatingAccumulator{}}}.
> For changes between 3.1 onwards to {{ApproximatePercentile}} and 
> {{{}QuantileSummaries{}}}:
>  # (in 3.2) 
> [https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
>  which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
> It seems to only be executed on the final result collection and optimizes for 
> cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple 
> value percentile calculations.
>  # (in 3.2) 
> [https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
>  which appears to change some plan interfaces. It reads as though only 
> impacted query compilation performance, rather than execution performance.
> Because I'm not seeing any changes that may possible address this performance 
> issue, I'm led to believe this is still an existing problem in Spark.
> I have some follow-up I'll do to dig into this further, namely:
>  - Test the job with {{percentile}} to see if it performs better, although 
> I'm a bit dubious we will get this to run with 37 billion input rows
>  - Test the job on Spark 3.3, which we have production-ready in our 
> environment
>  - Observe some of the thread dumps / profiling for the stage with poor 
> performance
>  - Isolate, or otherwise rewrite, the ApproximatePercentile going into a 
> CaseWhen in our specific SQL to narrow down if that change is related
> Here's an expanded version of our query, with irrelevant columns removed. I 
> figured I'd include this after realizing the relevant of the CaseWhen parent 
> node.
> {code}
> with source_data as (
>   select col1, col2 -- ... additional columns
>   from <table>
> ), percentiles as (
>   select
>       col1,
>       approx_percentile(col2, 0.95) as q_95,
>       approx_percentile(col2, 0.99) as q_99, 
>       approx_percentile(col2, 0.99) - approx_percentile(col2, 0.95) as q_diff
>   from source_data
>   group by 1
> ), calculations as (
>   select
>     col1,
>     case 
>       when q_diff > 5 then q_95 + 5
>       else q_99
>     end as calculated_val
>   from percentiles
> )
> select
>   /*+ BROADCAST (c) */
>   source_data.*, c.calculated_val
> from source_data
> left join calculations as c using(col1)
> -- ... additional joins / broadcasts
> {code}
> ---
> Later on, I found another app of ours with similarly unbearable regressions. 
> The app previously performed in <2hr run-time prior to upgrading, and is now 
> >24hr on Spark3. It's a lot simpler, so this is more-or-less the SQL it's 
> doing (with a number of other internal group-by fields removed for brievity).
> {code:sql}
> with t as (
>   select
>     method,
>     response_code,
>     if(response_code >= 200 and response_code < 300, duration_ms, null) as 
> duration_ms_2xx,
>   from logs
> )
> select
>   method,
>   count(*) as cnt,
>   sum(case when response_code >= 200 and response_code < 300 then 1 else 0 
> end) as cnt_2xx,
>   sum(case when response_code >= 300 and response_code < 400 then 1 else 0 
> end) as cnt_3xx,
>   sum(case when response_code >= 400 and response_code < 500 then 1 else 0 
> end) as cnt_4xx,
>   sum(case when response_code >= 500 then 1 else 0 end) as cnt_5xx,
>   approx_percentile(duration_ms_2xx, 0.5) as latency_p50,
>   approx_percentile(duration_ms_2xx, 0.9) as latency_p90,
>   approx_percentile(duration_ms_2xx, 0.99) as latency_p99,
>   approx_percentile(duration_ms_2xx, 0.999) as latency_p999,
> from t
> group by 1
> {code}
> This is simple enough that I'm a bit more convinced it could be the CaseWhen 
> change given the conditional is a common factor of the two snippets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to