[jira] [Updated] (SPARK-46706) percentile_approx regression since Spark 2.4

2024-01-12 Thread Joey Pereira (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joey Pereira updated SPARK-46706:
-
Summary: percentile_approx regression since Spark 2.4  (was: 
percentile_approx regression between at least Spark 3.1)

> percentile_approx regression since Spark 2.4
> 
>
> Key: SPARK-46706
> URL: https://issues.apache.org/jira/browse/SPARK-46706
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL
>Affects Versions: 3.1.3
>Reporter: Joey Pereira
>Priority: Major
> Attachments: Screenshot 2024-01-05 at 3.51.52 PM.png, Screenshot 
> 2024-01-05 at 3.53.10 PM.png
>
>
> Originally reported in SPARK-40499, I'm opening a new issue as that original 
> one was closed. It's not entirely clear if this regression exists on HEAD, 
> but I haven't seen any changes that I would expect to address this. Here's 
> what I reported in the other ticket.
> ---
> Chiming in, I'm currently hitting a similar issue while doing upgrades from 
> Spark 2.4 to Spark 3.1.
> In our case, we have an application with two queries that take 
> {{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
> (reading 37 billion rows and aggregating to ~100 and ~7000 rows, with 
> different groupBy keys respectively). This roughly what the query looks like:
> {code:java}
> select
> col1,
> percentile_approx(col2, 0.95) as q_95,
> percentile_approx(col2, 0.99) as q_99,
> percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
> from data
> group by 1
> {code}
> Also, I do recognize this could be written more optimally as 
> {{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this 
> is just the snippet as it was when I stumbled on it :)
> We've observed that the shuffle write stage is roughly the same time (and 
> size) but the shuffle read is drastically slower. Note, in this application 
> the shuffle read is also collecting for a broadcast. The difference for the 
> shuffle read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and 
> that was after reducing the {{percentile_approx}} precision from the default 
> (1) to 100 on Spark 3.1. When left at 1, the stage was exceeding 15 
> hours and hitting some of our application time limits.
> *Spark 2.4*
> !Screenshot 2024-01-05 at 3.53.10 PM.png!
>  - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
>  - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size
> *Spark 3.1*
> !Screenshot 2024-01-05 at 3.51.52 PM.png!
>  - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
> 20.3 GiB shuffle write size
>  - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size
> Prior to digging further into this, I thought it may have been some AQE 
> issue. That was incorrect and in fact the post-aggregation task/partition 
> count was much worse without AQE, at about 15.
> I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
> since 3.1, to see if there's anything significant. I just looked at PRs 
> against {{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, 
> and ignored some docs, formatting, refactoring, or very simple behavioural 
> changes. These are the notable changes with some commentary.
> For changes between 2.4 and 3.1:
>  # 
> [https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
>  which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
> This seems potentially relevant, especially since the shuffle reading which 
> is likely just executing merging is struggling. That being said, I would have 
> maybe expected this to also impact the partial_percentile_approx calculations 
> too.
>  # 
> [https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
>  -appears to change the behaviour in evaluation for {{case when}} / 
> conditional inputs. This is possibly relevant, but our SQL code only plumbs a 
> primitive in. Our actual job is a little more complex and the SQL is over a 
> subquery, but the subquery is not manipulating the column using for 
> {{{}percentile_approx{}}}.- I misunderstood this change initially. It looks 
> like an optimization to avoid reconstructing the data type for some parent 
> plan nodes. Our SQL code does consume percentiles in some CaseWhen clauses, 
> so I'll need to dig more into this.
> I didn't go far while looking into other code. I did not see relevant changes 
> in {{{}AggregatingAccumulator{}}}.
> For changes between 3.1 onwards to {{ApproximatePercentile}} and 
> {{{}QuantileSummaries{}}}:
>  # (in 3.2) 
> [https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
>  which appears to optimize the 

[jira] [Updated] (SPARK-46706) percentile_approx regression between at least Spark 3.1

2024-01-12 Thread Joey Pereira (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joey Pereira updated SPARK-46706:
-
Attachment: Screenshot 2024-01-05 at 3.53.10 PM.png

> percentile_approx regression between at least Spark 3.1
> ---
>
> Key: SPARK-46706
> URL: https://issues.apache.org/jira/browse/SPARK-46706
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL
>Affects Versions: 3.1.3
>Reporter: Joey Pereira
>Priority: Major
> Attachments: Screenshot 2024-01-05 at 3.51.52 PM.png, Screenshot 
> 2024-01-05 at 3.53.10 PM.png
>
>
> Originally reported in SPARK-40499, I'm opening a new issue as that original 
> one was closed. It's not entirely clear if this regression exists on HEAD, 
> but I haven't seen any changes that I would expect to address this. Here's 
> what I reported in the other ticket.
> ---
> Chiming in, I'm currently hitting a similar issue while doing upgrades from 
> Spark 2.4 to Spark 3.1.
> In our case, we have an application with two queries that take 
> {{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
> (reading 37 billion rows and aggregating to ~100 and ~7000 rows, with 
> different groupBy keys respectively). This roughly what the query looks like:
> {code:java}
> select
> col1,
> percentile_approx(col2, 0.95) as q_95,
> percentile_approx(col2, 0.99) as q_99,
> percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
> from data
> group by 1
> {code}
> Also, I do recognize this could be written more optimally as 
> {{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this 
> is just the snippet as it was when I stumbled on it :)
> We've observed that the shuffle write stage is roughly the same time (and 
> size) but the shuffle read is drastically slower. Note, in this application 
> the shuffle read is also collecting for a broadcast. The difference for the 
> shuffle read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and 
> that was after reducing the {{percentile_approx}} precision from the default 
> (1) to 100 on Spark 3.1. When left at 1, the stage was exceeding 15 
> hours and hitting some of our application time limits.
> *Spark 2.4*
> !Screenshot 2024-01-05 at 3.53.10 PM.png!
>  - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
>  - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size
> *Spark 3.1*
> !Screenshot 2024-01-05 at 3.51.52 PM.png!
>  - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
> 20.3 GiB shuffle write size
>  - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size
> Prior to digging further into this, I thought it may have been some AQE 
> issue. That was incorrect and in fact the post-aggregation task/partition 
> count was much worse without AQE, at about 15.
> I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
> since 3.1, to see if there's anything significant. I just looked at PRs 
> against {{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, 
> and ignored some docs, formatting, refactoring, or very simple behavioural 
> changes. These are the notable changes with some commentary.
> For changes between 2.4 and 3.1:
>  # 
> [https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
>  which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
> This seems potentially relevant, especially since the shuffle reading which 
> is likely just executing merging is struggling. That being said, I would have 
> maybe expected this to also impact the partial_percentile_approx calculations 
> too.
>  # 
> [https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
>  -appears to change the behaviour in evaluation for {{case when}} / 
> conditional inputs. This is possibly relevant, but our SQL code only plumbs a 
> primitive in. Our actual job is a little more complex and the SQL is over a 
> subquery, but the subquery is not manipulating the column using for 
> {{{}percentile_approx{}}}.- I misunderstood this change initially. It looks 
> like an optimization to avoid reconstructing the data type for some parent 
> plan nodes. Our SQL code does consume percentiles in some CaseWhen clauses, 
> so I'll need to dig more into this.
> I didn't go far while looking into other code. I did not see relevant changes 
> in {{{}AggregatingAccumulator{}}}.
> For changes between 3.1 onwards to {{ApproximatePercentile}} and 
> {{{}QuantileSummaries{}}}:
>  # (in 3.2) 
> [https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
>  which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
> It 

[jira] [Updated] (SPARK-46706) percentile_approx regression between at least Spark 3.1

2024-01-12 Thread Joey Pereira (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joey Pereira updated SPARK-46706:
-
Description: 
Originally reported in SPARK-40499, I'm opening a new issue as that original 
one was closed. It's not entirely clear if this regression exists on HEAD, but 
I haven't seen any changes that I would expect to address this. Here's what I 
reported in the other ticket.

---


Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different 
groupBy keys respectively). This roughly what the query looks like:
{code:java}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}
Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

!Screenshot 2024-01-05 at 3.53.10 PM.png!

 - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
 - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

*Spark 3.1*

!Screenshot 2024-01-05 at 3.51.52 PM.png!

 - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
20.3 GiB shuffle write size
 - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and 
ignored some docs, formatting, refactoring, or very simple behavioural changes. 
These are the notable changes with some commentary.

For changes between 2.4 and 3.1:
 # 
[https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
 which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
 # 
[https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
 -appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{{}percentile_approx{}}}.- I 
misunderstood this change initially. It looks like an optimization to avoid 
reconstructing the data type for some parent plan nodes. Our SQL code does 
consume percentiles in some CaseWhen clauses, so I'll need to dig more into 
this.

I didn't go far while looking into other code. I did not see relevant changes 
in {{{}AggregatingAccumulator{}}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{{}QuantileSummaries{}}}:
 # (in 3.2) 
[https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
 which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
It seems to only be executed on the final result collection and optimizes for 
cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple 
value percentile calculations.
 # (in 3.2) 
[https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
 which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

Because I'm not seeing any changes that may possible address this performance 
issue, I'm led to believe this is still an existing problem in Spark.

I have some follow-up I'll do to dig into this further, namely:
 - Test the job with {{percentile}} to see if it 

[jira] [Updated] (SPARK-46706) percentile_approx regression between at least Spark 3.1

2024-01-12 Thread Joey Pereira (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joey Pereira updated SPARK-46706:
-
Description: 
Originally reported in SPARK-40499, I'm opening a new issue as that original 
one was closed. It's not entirely clear if this regression exists on HEAD, but 
I haven't seen any changes that I would expect to address this. Here's what I 
reported in the other ticket.

---


Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different 
groupBy keys respectively). This roughly what the query looks like:
{code:java}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}
Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

!Screenshot 2024-01-05 at 3.53.10 PM.png!

 - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
 - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

*Spark 3.1*

!Screenshot 2024-01-05 at 3.51.52 PM.png!

 - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
20.3 GiB shuffle write size
 - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and 
ignored some docs, formatting, refactoring, or very simple behavioural changes. 
These are the notable changes with some commentary.

For changes between 2.4 and 3.1:
 # 
[https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
 which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
 # 
[https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
 -appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{{}percentile_approx{}}}.- I 
misunderstood this change initially. It looks like an optimization to avoid 
reconstructing the data type for some parent plan nodes. Our SQL code does 
consume percentiles in some CaseWhen clauses, so I'll need to dig more into 
this.

I didn't go far while looking into other code. I did not see relevant changes 
in {{{}AggregatingAccumulator{}}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{{}QuantileSummaries{}}}:
 # (in 3.2) 
[https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
 which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
It seems to only be executed on the final result collection and optimizes for 
cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple 
value percentile calculations.
 # (in 3.2) 
[https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
 which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

Because I'm not seeing any changes that may possible address this performance 
issue, I'm led to believe this is still an existing problem in Spark.

I have some follow-up I'll do to dig into this further, namely:
 - Test the job with {{percentile}} to see if it 

[jira] [Updated] (SPARK-46706) percentile_approx regression between at least Spark 3.1

2024-01-12 Thread Joey Pereira (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joey Pereira updated SPARK-46706:
-
Attachment: Screenshot 2024-01-05 at 3.51.52 PM.png

> percentile_approx regression between at least Spark 3.1
> ---
>
> Key: SPARK-46706
> URL: https://issues.apache.org/jira/browse/SPARK-46706
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL
>Affects Versions: 3.1.3
>Reporter: Joey Pereira
>Priority: Major
> Attachments: Screenshot 2024-01-05 at 3.51.52 PM.png, Screenshot 
> 2024-01-05 at 3.53.10 PM.png
>
>
> Originally reported in SPARK-40499, I'm opening a new issue as that original 
> one was closed. It's not entirely clear if this regression exists on HEAD, 
> but I haven't seen any changes that I would expect to address this. Here's 
> what I reported in the other ticket.
> ---
> Chiming in, I'm currently hitting a similar issue while doing upgrades from 
> Spark 2.4 to Spark 3.1.
> In our case, we have an application with two queries that take 
> {{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
> (reading 37 billion rows and aggregating to ~100 and ~7000 rows, with 
> different groupBy keys respectively). This roughly what the query looks like:
> {code:java}
> select
> col1,
> percentile_approx(col2, 0.95) as q_95,
> percentile_approx(col2, 0.99) as q_99,
> percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
> from data
> group by 1
> {code}
> Also, I do recognize this could be written more optimally as 
> {{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this 
> is just the snippet as it was when I stumbled on it :)
> We've observed that the shuffle write stage is roughly the same time (and 
> size) but the shuffle read is drastically slower. Note, in this application 
> the shuffle read is also collecting for a broadcast. The difference for the 
> shuffle read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and 
> that was after reducing the {{percentile_approx}} precision from the default 
> (1) to 100 on Spark 3.1. When left at 1, the stage was exceeding 15 
> hours and hitting some of our application time limits.
> *Spark 2.4*
> !Screenshot 2024-01-05 at 3.53.10 PM.png!
>  - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
>  - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size
> *Spark 3.1*
> !Screenshot 2024-01-05 at 3.51.52 PM.png!
>  - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
> 20.3 GiB shuffle write size
>  - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size
> Prior to digging further into this, I thought it may have been some AQE 
> issue. That was incorrect and in fact the post-aggregation task/partition 
> count was much worse without AQE, at about 15.
> I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
> since 3.1, to see if there's anything significant. I just looked at PRs 
> against {{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, 
> and ignored some docs, formatting, refactoring, or very simple behavioural 
> changes. These are the notable changes with some commentary.
> For changes between 2.4 and 3.1:
>  # 
> [https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
>  which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
> This seems potentially relevant, especially since the shuffle reading which 
> is likely just executing merging is struggling. That being said, I would have 
> maybe expected this to also impact the partial_percentile_approx calculations 
> too.
>  # 
> [https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
>  -appears to change the behaviour in evaluation for {{case when}} / 
> conditional inputs. This is possibly relevant, but our SQL code only plumbs a 
> primitive in. Our actual job is a little more complex and the SQL is over a 
> subquery, but the subquery is not manipulating the column using for 
> {{{}percentile_approx{}}}.- I misunderstood this change initially. It looks 
> like an optimization to avoid reconstructing the data type for some parent 
> plan nodes. Our SQL code does consume percentiles in some CaseWhen clauses, 
> so I'll need to dig more into this.
> I didn't go far while looking into other code. I did not see relevant changes 
> in {{{}AggregatingAccumulator{}}}.
> For changes between 3.1 onwards to {{ApproximatePercentile}} and 
> {{{}QuantileSummaries{}}}:
>  # (in 3.2) 
> [https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
>  which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
> It 

[jira] [Created] (SPARK-46706) percentile_approx regression between at least Spark 3.1

2024-01-12 Thread Joey Pereira (Jira)
Joey Pereira created SPARK-46706:


 Summary: percentile_approx regression between at least Spark 3.1
 Key: SPARK-46706
 URL: https://issues.apache.org/jira/browse/SPARK-46706
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, SQL
Affects Versions: 3.1.3
Reporter: Joey Pereira


Originally reported in SPARK-40499, I'm opening a new issue as that original 
one was closed. It's not entirely clear if this regression exists on HEAD, but 
I haven't seen any changes that I would expect to address this. Here's what I 
reported in the other ticket.

---


Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different 
groupBy keys respectively). This roughly what the query looks like:
{code:java}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}
Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

!Screenshot 2024-01-05 at 3.53.10 PM.png!
 - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
 - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

*Spark 3.1*

!Screenshot 2024-01-05 at 3.51.52 PM.png!
 - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
20.3 GiB shuffle write size
 - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and 
ignored some docs, formatting, refactoring, or very simple behavioural changes. 
These are the notable changes with some commentary.

For changes between 2.4 and 3.1:
 # 
[https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
 which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
 # 
[https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
 -appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{{}percentile_approx{}}}.- I 
misunderstood this change initially. It looks like an optimization to avoid 
reconstructing the data type for some parent plan nodes. Our SQL code does 
consume percentiles in some CaseWhen clauses, so I'll need to dig more into 
this.

I didn't go far while looking into other code. I did not see relevant changes 
in {{{}AggregatingAccumulator{}}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{{}QuantileSummaries{}}}:
 # (in 3.2) 
[https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
 which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
It seems to only be executed on the final result collection and optimizes for 
cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple 
value percentile calculations.
 # (in 3.2) 
[https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
 which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

Because I'm not seeing any changes that may possible address this 

[jira] [Commented] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-12 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806201#comment-17806201
 ] 

Joey Pereira commented on SPARK-40499:
--

I've found another app of ours with similarly unbearable regressions. The app 
previously performed in <2hr run-time prior to upgrading, and is now >24hr on 
Spark3. It's a lot simpler, so this is more-or-less the SQL it's doing (with a 
number of other internal group-by fields removed for brievity).

{code:sql}
with t as (
  select
method,
response_code,
if(response_code >= 200 and response_code < 300, duration_ms, null) as 
duration_ms_2xx,
  from logs
)
select
  method,
  count(*) as cnt,
  sum(case when response_code >= 200 and response_code < 300 then 1 else 0 end) 
as cnt_2xx,
  sum(case when response_code >= 300 and response_code < 400 then 1 else 0 end) 
as cnt_3xx,
  sum(case when response_code >= 400 and response_code < 500 then 1 else 0 end) 
as cnt_4xx,
  sum(case when response_code >= 500 then 1 else 0 end) as cnt_5xx,
  approx_percentile(duration_ms_2xx, 0.5) as latency_p50,
  approx_percentile(duration_ms_2xx, 0.9) as latency_p90,
  approx_percentile(duration_ms_2xx, 0.99) as latency_p99,
  approx_percentile(duration_ms_2xx, 0.999) as latency_p999,
from t
group by 1
{code}

This is simple enough that I'm a bit more convinced it could be the CaseWhen 
change.

> Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0
> 
>
> Key: SPARK-40499
> URL: https://issues.apache.org/jira/browse/SPARK-40499
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.1
> Environment: hadoop: 3.0.0 
> spark:  2.4.0 / 3.2.1
> shuffle:spark 2.4.0
>Reporter: xuanzhiang
>Priority: Major
> Attachments: Screenshot 2024-01-05 at 3.51.52 PM.png, Screenshot 
> 2024-01-05 at 3.53.10 PM.png, spark2.4-shuffle-data.png, 
> spark3.2-shuffle-data.png
>
>
> spark.sql(
>       s"""
>          |SELECT
>          | Info ,
>          | PERCENTILE_APPROX(cost,0.5) cost_p50,
>          | PERCENTILE_APPROX(cost,0.9) cost_p90,
>          | PERCENTILE_APPROX(cost,0.95) cost_p95,
>          | PERCENTILE_APPROX(cost,0.99) cost_p99,
>          | PERCENTILE_APPROX(cost,0.999) cost_p999
>          |FROM
>          | textData
>          |""".stripMargin)
>  * When we used spark 2.4.0, aggregation adopted objHashAggregator, stage 2 
> pull shuffle data very quick . but , when we use spark 3.2.1 and use old 
> shuffle , 140M shuffle data cost 3 hours. 
>  * If we upgrade the Shuffle, will we get performance regression?
>  *  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-12 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806202#comment-17806202
 ] 

Joey Pereira commented on SPARK-40499:
--

(I realized I can't really re-open the ticket, so I'll go ahead and open a new 
one)

> Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0
> 
>
> Key: SPARK-40499
> URL: https://issues.apache.org/jira/browse/SPARK-40499
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.1
> Environment: hadoop: 3.0.0 
> spark:  2.4.0 / 3.2.1
> shuffle:spark 2.4.0
>Reporter: xuanzhiang
>Priority: Major
> Attachments: Screenshot 2024-01-05 at 3.51.52 PM.png, Screenshot 
> 2024-01-05 at 3.53.10 PM.png, spark2.4-shuffle-data.png, 
> spark3.2-shuffle-data.png
>
>
> spark.sql(
>       s"""
>          |SELECT
>          | Info ,
>          | PERCENTILE_APPROX(cost,0.5) cost_p50,
>          | PERCENTILE_APPROX(cost,0.9) cost_p90,
>          | PERCENTILE_APPROX(cost,0.95) cost_p95,
>          | PERCENTILE_APPROX(cost,0.99) cost_p99,
>          | PERCENTILE_APPROX(cost,0.999) cost_p999
>          |FROM
>          | textData
>          |""".stripMargin)
>  * When we used spark 2.4.0, aggregation adopted objHashAggregator, stage 2 
> pull shuffle data very quick . but , when we use spark 3.2.1 and use old 
> shuffle , 140M shuffle data cost 3 hours. 
>  * If we upgrade the Shuffle, will we get performance regression?
>  *  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-05 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803735#comment-17803735
 ] 

Joey Pereira edited comment on SPARK-40499 at 1/6/24 1:17 AM:
--

Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different 
groupBy keys respectively). This roughly what the query looks like:
{code:java}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}
Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

!Screenshot 2024-01-05 at 3.53.10 PM.png!
 - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
 - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

*Spark 3.1*

!Screenshot 2024-01-05 at 3.51.52 PM.png!
 - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
20.3 GiB shuffle write size
 - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and 
ignored some docs, formatting, refactoring, or very simple behavioural changes. 
These are the notable changes with some commentary.

For changes between 2.4 and 3.1:
 # 
[https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
 which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
 # 
[https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
 -appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{{}percentile_approx{}}}.- I 
misunderstood this change initially. It looks like an optimization to avoid 
reconstructing the data type for some parent plan nodes. Our SQL code does 
consume percentiles in some CaseWhen clauses, so I'll need to dig more into 
this.

I didn't go far while looking into other code. I did not see relevant changes 
in {{{}AggregatingAccumulator{}}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{{}QuantileSummaries{}}}:
 # (in 3.2) 
[https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
 which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
It seems to only be executed on the final result collection and optimizes for 
cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple 
value percentile calculations.
 # (in 3.2) 
[https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
 which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

Because I'm not seeing any changes that may possible address this performance 
issue, I'm led to believe this is still an existing problem in Spark.

I have some follow-up I'll do to dig into this further, namely:
 - Test the job with {{percentile}} to see if it performs better, although I'm 
a bit dubious we will get this to run with 37 billion input rows
 - Test the job on Spark 3.3, which we have production-ready in our environment
 - Observe some of the thread 

[jira] [Comment Edited] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-05 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803735#comment-17803735
 ] 

Joey Pereira edited comment on SPARK-40499 at 1/6/24 1:17 AM:
--

Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different 
groupBy keys respectively). This roughly what the query looks like:
{code:java}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}
Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

!Screenshot 2024-01-05 at 3.53.10 PM.png!
 - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
 - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

*Spark 3.1*

!Screenshot 2024-01-05 at 3.51.52 PM.png!
 - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
20.3 GiB shuffle write size
 - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and 
ignored some docs, formatting, refactoring, or very simple behavioural changes. 
These are the notable changes with some commentary.

For changes between 2.4 and 3.1:
 # 
[https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
 which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
 # 
[https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
 -appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{{}percentile_approx{}}}.- I 
misunderstood this change initially. It looks like an optimization to avoid 
reconstructing the data type for some parent plan nodes. Our SQL code does 
consume percentiles in some CaseWhen clauses, so I'll need to dig more into 
this.

I didn't go far while looking into other code. I did not see relevant changes 
in {{{}AggregatingAccumulator{}}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{{}QuantileSummaries{}}}:
 # (in 3.2) 
[https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
 which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
It seems to only be executed on the final result collection and optimizes for 
cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple 
value percentile calculations.
 # (in 3.2) 
[https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
 which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

Because I'm not seeing any changes that may possible address this performance 
issue, I'm led to believe this is still an existing problem in Spark.

I have some follow-up I'll do to dig into this further, namely:
 - Test the job with {{percentile}} to see if it performs better, although I'm 
a bit dubious we will get this to run with 37 billion input rows
 - Test the job on Spark 3.3, which we have production-ready in our environment
 - Observe some of the thread 

[jira] [Comment Edited] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-05 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803735#comment-17803735
 ] 

Joey Pereira edited comment on SPARK-40499 at 1/6/24 1:12 AM:
--

Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different 
groupBy keys respectively). This roughly what the query looks like:
{code:java}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}
Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

!Screenshot 2024-01-05 at 3.53.10 PM.png!
 - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
 - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

*Spark 3.1*

!Screenshot 2024-01-05 at 3.51.52 PM.png!
 - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
20.3 GiB shuffle write size
 - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and 
ignored some docs, formatting, refactoring, or very simple behavioural changes. 
These are the notable changes with some commentary.

For changes between 2.4 and 3.1:
 # 
[https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
 which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
 # 
[https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
 -appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{{}percentile_approx{}}}.- I 
misunderstood this change initially. It looks like an optimization to avoid 
reconstructing the data type for some parent plan nodes. Our SQL code does 
consume percentiles in some CaseWhen clauses, so I'll need to dig more into 
this.

I didn't go far while looking into other code. I did not see relevant changes 
in {{{}AggregatingAccumulator{}}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{{}QuantileSummaries{}}}:
 # (in 3.2) 
[https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
 which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
It seems to only be executed on the final result collection and optimizes for 
cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple 
value percentile calculations.
 # (in 3.2) 
[https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
 which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

Because I'm not seeing any changes that may possible address this performance 
issue, I'm led to believe this is still an existing problem in Spark.

I have some follow-up I'll do to dig into this further, namely:
 - Test the job with {{percentile}} to see if it performs better, although I'm 
a bit dubious we will get this to run with 37 billion input rows
 - Test the job on Spark 3.3, which we have production-ready in our environment
 - Observe some of the thread 

[jira] [Comment Edited] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-05 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803735#comment-17803735
 ] 

Joey Pereira edited comment on SPARK-40499 at 1/6/24 1:10 AM:
--

Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different 
groupBy keys respectively). This roughly what the query looks like:
{code:java}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}
Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

!Screenshot 2024-01-05 at 3.53.10 PM.png!
 - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
 - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

*Spark 3.1*

!Screenshot 2024-01-05 at 3.51.52 PM.png!
 - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
20.3 GiB shuffle write size
 - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and 
ignored some docs, formatting, refactoring, or very simple behavioural changes. 
These are the notable changes with some commentary.

For changes between 2.4 and 3.1:
 # 
[https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
 which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
 # 
[https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
 -appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{{}percentile_approx{}}}.- I 
misunderstood this change initially. It looks like an optimization to avoid 
reconstructing the data type for some parent plan nodes. Our SQL code does 
consume percentiles in some CaseWhen clauses, so I'll need to dig more into 
this.

I didn't go far while looking into other code. I did not see relevant changes 
in {{{}AggregatingAccumulator{}}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{{}QuantileSummaries{}}}:
 # (in 3.2) 
[https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
 which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
It seems to only be executed on the final result collection and optimizes for 
cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple 
value percentile calculations.
 # (in 3.2) 
[https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
 which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

Because I'm not seeing any changes that may possible address this performance 
issue, I'm led to believe this is still an existing problem in Spark.

I have some follow-up I'll do to dig into this further, namely:
 - Test the job with {{percentile}} to see if it performs better, although I'm 
a bit dubious we will get this to run with 37 billion input rows
 - Test the job on Spark 3.3, which we have production-ready in our environment
 - Observe some of the thread 

[jira] [Comment Edited] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-05 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803735#comment-17803735
 ] 

Joey Pereira edited comment on SPARK-40499 at 1/6/24 1:03 AM:
--

Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different 
groupBy keys respectively). This roughly what the query looks like:
{code:java}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}
Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

!Screenshot 2024-01-05 at 3.53.10 PM.png!
 - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
 - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

*Spark 3.1*

!Screenshot 2024-01-05 at 3.51.52 PM.png!
 - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
20.3 GiB shuffle write size
 - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and 
ignored some docs, formatting, refactoring, or very simple behavioural changes. 
These are the notable changes with some commentary.

For changes between 2.4 and 3.1:
 # 
[https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
 which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
 # 
[https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
 appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{{}percentile_approx{}}}.

I didn't go far while looking into other code. I did not see relevant changes 
in {{{}AggregatingAccumulator{}}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{{}QuantileSummaries{}}}:
 # (in 3.2) 
[https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
 which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
It seems to only be executed on the final result collection and optimizes for 
cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple 
value percentile calculations.
 # (in 3.2) 
[https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
 which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

Because I'm not seeing any changes that may possible address this performance 
issue, I'm led to believe this is still an existing problem in Spark.

I have some follow-up I'll do to dig into this further, namely:
 - Test the job with {{percentile}} to see if it performs better, although I'm 
a bit dubious we will get this to run with 37 billion input rows
 - Test the job on Spark 3.3, which we have production-ready in our environment
 - Observe some of the thread dumps / profiling for the stage with poor 
performance


was (Author: legojoey17):
Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that 

[jira] [Comment Edited] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-05 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803735#comment-17803735
 ] 

Joey Pereira edited comment on SPARK-40499 at 1/6/24 1:03 AM:
--

Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different 
groupBy keys respectively). This roughly what the query looks like:
{code:java}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}
Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

!Screenshot 2024-01-05 at 3.53.10 PM.png!
 - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
 - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

*Spark 3.1*

!Screenshot 2024-01-05 at 3.51.52 PM.png!
 - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
20.3 GiB shuffle write size
 - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and 
ignored some docs, formatting, refactoring, or very simple behavioural changes. 
These are the notable changes with some commentary.

For changes between 2.4 and 3.1:
 # 
[https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
 which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
 # 
[https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
 appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{{}percentile_approx{}}}.

I didn't go far while looking into other code. I did not see relevant changes 
in {{{}AggregatingAccumulator{}}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{{}QuantileSummaries{}}}:
 # (in 3.2) 
[https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
 which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
It seems to only be executed on the final result collection and optimizes for 
cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple 
value percentile calculations.
 # (in 3.2) 
[https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
 which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

I have some follow-up I'll do to dig into this further, namely:
 - Test the job with {{percentile}} to see if it performs better, although I'm 
a bit dubious we will get this to run with 37 billion input rows
 - Test the job on Spark 3.3, which we have production-ready in our environment
 - Observe some of the thread dumps / profiling for the stage with poor 
performance


was (Author: legojoey17):
Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows, 

[jira] [Comment Edited] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-05 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803735#comment-17803735
 ] 

Joey Pereira edited comment on SPARK-40499 at 1/6/24 1:02 AM:
--

Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different 
groupBy keys respectively). This roughly what the query looks like:
{code:java}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}
Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

!Screenshot 2024-01-05 at 3.53.10 PM.png!
 - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
 - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

*Spark 3.1*

!Screenshot 2024-01-05 at 3.51.52 PM.png!
 - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
20.3 GiB shuffle write size
 - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and 
ignored some docs, formatting, refactoring, or very simple behavioural changes. 
These are the notable changes with some commentary.

For changes between 2.4 and 3.1:
 # 
[https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
 which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
 # 
[https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
 appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{{}percentile_approx{}}}.

I didn't go far while looking into other code. I did not see relevant changes 
in {{{}AggregatingAccumulator{}}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{{}QuantileSummaries{}}}:
 # (in 3.2) 
[https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
 which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
While that might be nice, it seems to only be executed on the final result 
collection and optimizes for cases like {{percentile_approx(..., array(0.5, 
0.9, 0.95), ...)}} - multiple value percentile calculations.
 # (in 3.2) 
[https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
 which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

I have some follow-up I'll do to dig into this further, namely:
 - Test the job with {{percentile}} to see if it performs better, although I'm 
a bit dubious we will get this to run with 37 billion input rows
 - Test the job on Spark 3.3, which we have production-ready in our environment
 - Observe some of the thread dumps / profiling for the stage with poor 
performance


was (Author: legojoey17):
Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating 

[jira] [Commented] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-05 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803735#comment-17803735
 ] 

Joey Pereira commented on SPARK-40499:
--

Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows with different 
groupBy keys). This roughly what the query looks like:

{code}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}

Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

[image]

 !Screenshot 2024-01-05 at 3.53.10 PM.png! 
- Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
- Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size


*Spark 3.1*

 !Screenshot 2024-01-05 at 3.51.52 PM.png! 

- Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 20.3 
GiB shuffle write size
- Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{ApproximatePercentile.scala}}, and ignored 
some docs, formatting, refactoring, or very simple behavioural changes. These 
are the notable changes with some commentary.

For changes between 2.4 and 3.1:
# 
https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543 
which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
# 
https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3 
appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{percentile_approx}}.

I didn't go far while looking into other code. I did not see relevant changes 
in {{AggregatingAccumulator}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{QuantileSummaries}}:
# (in 3.2) 
https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0 
which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
While that might be nice, it seems to only be executed on the final result 
collection and optimizes for cases like {{percentile_approx(..., array(0.5, 
0.9, 0.95), ...)}} - multiple value percentile calculations.
# (in 3.2) 
https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3 
which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

I have some follow-up I'll do to dig into this further, namely:
- Test the job with {{percentile}} to see if it performs better, although I'm a 
bit dubious we will get this to run with 37 billion input rows
- Test the job on Spark 3.3, which we have production-ready in our environment
- Observe some of the thread dumps / profiling for the stage with poor 
performance

> Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0
> 
>
> Key: SPARK-40499
> URL: https://issues.apache.org/jira/browse/SPARK-40499
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.1
> Environment: hadoop: 3.0.0 

[jira] [Comment Edited] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-05 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803735#comment-17803735
 ] 

Joey Pereira edited comment on SPARK-40499 at 1/6/24 1:02 AM:
--

Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and ~7000 rows with different 
groupBy keys). This roughly what the query looks like:
{code:java}
select
col1,
percentile_approx(col2, 0.95) as q_95,
percentile_approx(col2, 0.99) as q_99,
percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
from data
group by 1
{code}
Also, I do recognize this could be written more optimally as 
{{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is 
just the snippet as it was when I stumbled on it :)

We've observed that the shuffle write stage is roughly the same time (and size) 
but the shuffle read is drastically slower. Note, in this application the 
shuffle read is also collecting for a broadcast. The difference for the shuffle 
read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was 
after reducing the {{percentile_approx}} precision from the default (1) to 
100 on Spark 3.1. When left at 1, the stage was exceeding 15 hours and 
hitting some of our application time limits.

*Spark 2.4*

!Screenshot 2024-01-05 at 3.53.10 PM.png!
 - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
 - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

*Spark 3.1*

!Screenshot 2024-01-05 at 3.51.52 PM.png!
 - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 
20.3 GiB shuffle write size
 - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

Prior to digging further into this, I thought it may have been some AQE issue. 
That was incorrect and in fact the post-aggregation task/partition count was 
much worse without AQE, at about 15.

I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes 
since 3.1, to see if there's anything significant. I just looked at PRs against 
{{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and 
ignored some docs, formatting, refactoring, or very simple behavioural changes. 
These are the notable changes with some commentary.

For changes between 2.4 and 3.1:
 # 
[https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543]
 which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. 
This seems potentially relevant, especially since the shuffle reading which is 
likely just executing merging is struggling. That being said, I would have 
maybe expected this to also impact the partial_percentile_approx calculations 
too.
 # 
[https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3]
 appears to change the behaviour in evaluation for {{case when}} / conditional 
inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. 
Our actual job is a little more complex and the SQL is over a subquery, but the 
subquery is not manipulating the column using for {{{}percentile_approx{}}}.

I didn't go far while looking into other code. I did not see relevant changes 
in {{{}AggregatingAccumulator{}}}.

For changes between 3.1 onwards to {{ApproximatePercentile}} and 
{{{}QuantileSummaries{}}}:
 # (in 3.2) 
[https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0]
 which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. 
While that might be nice, it seems to only be executed on the final result 
collection and optimizes for cases like {{percentile_approx(..., array(0.5, 
0.9, 0.95), ...)}} - multiple value percentile calculations.
 # (in 3.2) 
[https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3]
 which appears to change some plan interfaces. It reads as though only impacted 
query compilation performance, rather than execution performance.

I have some follow-up I'll do to dig into this further, namely:
 - Test the job with {{percentile}} to see if it performs better, although I'm 
a bit dubious we will get this to run with 37 billion input rows
 - Test the job on Spark 3.3, which we have production-ready in our environment
 - Observe some of the thread dumps / profiling for the stage with poor 
performance


was (Author: legojoey17):
Chiming in, I'm currently hitting a similar issue while doing upgrades from 
Spark 2.4 to Spark 3.1.

In our case, we have an application with two queries that take 
{{percentile_approx}} of a fairly large dataset and reduce to only a few rows 
(reading 37 billion rows and aggregating to ~100 and 

[jira] [Updated] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-05 Thread Joey Pereira (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joey Pereira updated SPARK-40499:
-
Attachment: Screenshot 2024-01-05 at 3.53.10 PM.png

> Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0
> 
>
> Key: SPARK-40499
> URL: https://issues.apache.org/jira/browse/SPARK-40499
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.1
> Environment: hadoop: 3.0.0 
> spark:  2.4.0 / 3.2.1
> shuffle:spark 2.4.0
>Reporter: xuanzhiang
>Priority: Major
> Attachments: Screenshot 2024-01-05 at 3.51.52 PM.png, Screenshot 
> 2024-01-05 at 3.53.10 PM.png, spark2.4-shuffle-data.png, 
> spark3.2-shuffle-data.png
>
>
> spark.sql(
>       s"""
>          |SELECT
>          | Info ,
>          | PERCENTILE_APPROX(cost,0.5) cost_p50,
>          | PERCENTILE_APPROX(cost,0.9) cost_p90,
>          | PERCENTILE_APPROX(cost,0.95) cost_p95,
>          | PERCENTILE_APPROX(cost,0.99) cost_p99,
>          | PERCENTILE_APPROX(cost,0.999) cost_p999
>          |FROM
>          | textData
>          |""".stripMargin)
>  * When we used spark 2.4.0, aggregation adopted objHashAggregator, stage 2 
> pull shuffle data very quick . but , when we use spark 3.2.1 and use old 
> shuffle , 140M shuffle data cost 3 hours. 
>  * If we upgrade the Shuffle, will we get performance regression?
>  *  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40499) Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0

2024-01-05 Thread Joey Pereira (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joey Pereira updated SPARK-40499:
-
Attachment: Screenshot 2024-01-05 at 3.51.52 PM.png

> Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0
> 
>
> Key: SPARK-40499
> URL: https://issues.apache.org/jira/browse/SPARK-40499
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.1
> Environment: hadoop: 3.0.0 
> spark:  2.4.0 / 3.2.1
> shuffle:spark 2.4.0
>Reporter: xuanzhiang
>Priority: Major
> Attachments: Screenshot 2024-01-05 at 3.51.52 PM.png, Screenshot 
> 2024-01-05 at 3.53.10 PM.png, spark2.4-shuffle-data.png, 
> spark3.2-shuffle-data.png
>
>
> spark.sql(
>       s"""
>          |SELECT
>          | Info ,
>          | PERCENTILE_APPROX(cost,0.5) cost_p50,
>          | PERCENTILE_APPROX(cost,0.9) cost_p90,
>          | PERCENTILE_APPROX(cost,0.95) cost_p95,
>          | PERCENTILE_APPROX(cost,0.99) cost_p99,
>          | PERCENTILE_APPROX(cost,0.999) cost_p999
>          |FROM
>          | textData
>          |""".stripMargin)
>  * When we used spark 2.4.0, aggregation adopted objHashAggregator, stage 2 
> pull shuffle data very quick . but , when we use spark 3.2.1 and use old 
> shuffle , 140M shuffle data cost 3 hours. 
>  * If we upgrade the Shuffle, will we get performance regression?
>  *  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42886) ClassNotFoundException: scala.math.Ordering$Reverse

2023-09-25 Thread Joey Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768801#comment-17768801
 ] 

Joey Pereira commented on SPARK-42886:
--

Adding another anecdote here. I've got a codebase on Scala 2.12.10 and I 
recently just hit this bug while getting Spark 3.3.2 working for our systems.

There wasn't anything documented about a minimum version within 2.12.x as well 
as no conditional on the dependency. I see the Spark main branch does have at 
least one case of conditionals for Scala 2.13 so I wager this would be a 
reasonable solution, e.g. 
[https://github.com/dongjoon-hyun/spark/blame/master/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala#L232-L233]

> ClassNotFoundException: scala.math.Ordering$Reverse
> ---
>
> Key: SPARK-42886
> URL: https://issues.apache.org/jira/browse/SPARK-42886
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 3.3.2
> Environment: Development environment
> MacBook Pro
> Java JDK ibm-1.8-362
>  
>Reporter: Steve Chong
>Priority: Major
>
> Hi,
> We are using the spark-mllib_2.12 dependency in a Java project.
> We are attempting to upgrade from version 3.3.1 to 3.3.2. This results in 
> unit tests breaking with exception: ClassNotFoundException: 
> scala.math.Ordering$Reverse
> A change was made to add the class to the KyroSerializer  
> https://issues.apache.org/jira/browse/SPARK-42071
> scala.math.Ordering$Reverse was introduced int Scala 2.12.12. The maven 
> dependency tree (mvn dependency:tree) shows that spark-mllib_2.12 brings in 
> scala-library version 2.12.8. Therefore, it doesn't contain 
> scala.math.Ordering$Reverse. 
> If the scala-library transitive dependency is excluded from the POM and an 
> explicit dependency declared on with version >=2.12.12, the tests will pass.
> Should the scala-library version contained in 3.3.2 be upgraded to >=2.12.12?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org