[sql] how to connect query stage to Spark job/stages?

2023-11-29 Thread Chenghao Lyu
Hi,

I am seeking advice on measuring the performance of each QueryStage (QS) when 
AQE is enabled in Spark SQL. Specifically, I need help to automatically map a 
QS to its corresponding jobs (or stages) to get the QS runtime metrics.

I recorded the QS structure via a customized injected Query Stage Optimizer 
Rule. However, I am blocked by mapping a QS to its corresponding jobs (or 
stages) to aggregate its runtime metrics. I have tried the SparkListener, but 
neither the SparkListenerJobStart nor the SparkListenerStageSubmitted wraps the 
level of details that can match itself to a QS.

I am thinking of re-compiling Spark to enable the mapping. However, I am not 
experienced in the Spark source code…

Thanks for your help!

Cheers,
Chenghao


[SparkSQL, SparkUI, RESTAPI] How to extract the WholeStageCodeGen ids from SparkUI

2023-04-07 Thread Chenghao Lyu
Hi,

The detailed stage page shows the involved WholeStageCodegen Ids in its DAG 
visualization from the Spark UI when running a SparkSQL. (e.g., under the link 
node:18088/history/application_1663600377480_62091/stages/stage/?id=1=0).

However, I have trouble extracting the WholeStageCodegen ids from the DAG 
visualization via the RESTAPIs. Is there any other way to get the 
WholeStageCodegen Ids information for each stage automatically?

Cheers,
Chenghao


Re: Depolying stage-level scheduling for Spark SQL

2022-09-30 Thread Chenghao Lyu
Thanks for the clarification Tom!

A bit more backgrounds for what we want to do: we have proposed a fine-grained 
(stage-level) resource optimization approach in VLDB22 
https://www.vldb.org/pvldb/vol15/p3098-lyu.pdf and would like to try it over 
Spark. Our approach can recommend the resource configuration for each stage 
automatically (by using ML and our optimization framework), and we would like 
to see how to embed it in Spark. Initially, we consider that there is no AQE to 
make it simpler.

Now I see the problem in two folds (In both cases, the stage-level 
configurations will be automatically configured by our algorithm with the the 
upper and lower bounds of each tunable resource given by a user):

(1) If AQE is disabled in Spark SQL, and hence the RDD DAG will not be changed 
after the physical plan is selected, do you think it is feasible and worth 
exposing the RDDs and reusing the existing stage-level scheduling API for 
optimization?
(2) If AQE is enabled in Spark SQL, I would agree and prefer to add the 
stage-level resource optimization inside the AQE. Since I am not very 
experienced with the AQE part, would you list more potential challenges it may 
lead to?

Thanks in advance and I would really appreciate it if you could give us more 
feedback!

Cheers,
Chenghao
On Sep 30, 2022, 4:22 PM +0200, Tom Graves , 
wrote:
> see the original SPIP for as to why we only support RDD: 
> https://issues.apache.org/jira/browse/SPARK-27495
>
>
> The main problem is exactly what you are referring to. The RDD level is not 
> exposed to the user when using SQL or Dataframe API. This is on purpose and 
> user shouldn't have to know anything about the underlying impelementation 
> using RDDs. Especially with AQE and other optimizations that could change 
> things. You may start out with one physical plan and AQE can change it along 
> the way, so how does user change RDD at that point?   It would be very 
> difficult to expose this to the user and I don't think it should be.  I think 
> we would have to come up with some other way to apply stage level scheduling 
> to SQL/dataframe, or like mentioned in original issue if AQE gets smart 
> enough it would just do it for the user, but lots of factors that come into 
> play that make that difficult as well.
>
> Tom
> On Friday, September 30, 2022, 04:15:36 AM CDT, Chenghao Lyu 
>  wrote:
>
>
> Thanks for the reply!
>
> To clarify, for issue 2, it could still break apart a query into multiple 
> jobs without AQE — I have turned off the AQE in my posted example.
>
> For 1, an end user just needs to turn on/off a knob to use the stage-level 
> scheduling for Spark SQL — I am considering adding a component between the 
> Spark SQL module and the Spark Core model to optimize the stage-level 
> resource.
>
> Yes, SQL is declarative. It uses a sequence of components (such as a logical 
> planner, physical planner, and CBO) to get a selected physical plan. The RDDs 
> (with the transformations) are generated based on the selected physical plan 
> for execution. For now, we could only get the top-level RDD of the DAG of 
> RDDs by `spark.sql(q1).queryExecution.toRdd`, but it is not enough to make 
> stage-level scheduling decisions. The stage-level resources are profiled 
> based on the RDDs. If we could expose the all RDDs instead of the top-level 
> RDD, it seems possible to apply the stage-level scheduling here.
>
>
> P.S. let me attach the link for the RDD regeneration explicitly in case it is 
> not shown on the mail-list website: 
> https://stackoverflow.com/questions/73895506/how-to-avoid-rdd-regeneration-in-spark-sql
>
> Cheers,
> Chenghao
> On Sep 29, 2022, 5:22 PM +0200, Herman van Hovell , 
> wrote:
> > I think issue 2 is caused by adaptive query execution. This will break 
> > apart queries into multiple jobs, each subsequent job will generate a RDD 
> > that is based on previous ones.
> >
> > As for 1. I am not sure how much you want to expose to an end user here. 
> > SQL is declarative, and it does not specify how a query should be executed. 
> > I can imagine that you might use different resources for different types of 
> > stages, e.g. a scan stage and more compute heavy stages. This, IMO, should 
> > be based on analysis and costing the plan. For this RDD only stage level 
> > scheduling should be sufficient.
> >
> > On Thu, Sep 29, 2022 at 8:56 AM Chenghao Lyu  wrote:
> > > Hi,
> > >
> > > I plan to deploy the stage-level scheduling for Spark SQL to apply some 
> > > fine-grained optimizations over the DAG of stages. However, I am blocked 
> > > by the following issues:
> > >
> > > 1. The current stage-level scheduling supports RDD APIs only. So is there 
> > > a

Re: Depolying stage-level scheduling for Spark SQL

2022-09-30 Thread Chenghao Lyu
Thanks for the reply!

To clarify, for issue 2, it could still break apart a query into multiple jobs 
without AQE — I have turned off the AQE in my posted example.

For 1, an end user just needs to turn on/off a knob to use the stage-level 
scheduling for Spark SQL — I am considering adding a component between the 
Spark SQL module and the Spark Core model to optimize the stage-level resource.

Yes, SQL is declarative. It uses a sequence of components (such as a logical 
planner, physical planner, and CBO) to get a selected physical plan. The RDDs 
(with the transformations) are generated based on the selected physical plan 
for execution. For now, we could only get the top-level RDD of the DAG of RDDs 
by `spark.sql(q1).queryExecution.toRdd`, but it is not enough to make 
stage-level scheduling decisions. The stage-level resources are profiled based 
on the RDDs. If we could expose the all RDDs instead of the top-level RDD, it 
seems possible to apply the stage-level scheduling here.


P.S. let me attach the link for the RDD regeneration explicitly in case it is 
not shown on the mail-list website: 
https://stackoverflow.com/questions/73895506/how-to-avoid-rdd-regeneration-in-spark-sql

Cheers,
Chenghao
On Sep 29, 2022, 5:22 PM +0200, Herman van Hovell , 
wrote:
> I think issue 2 is caused by adaptive query execution. This will break apart 
> queries into multiple jobs, each subsequent job will generate a RDD that is 
> based on previous ones.
>
> As for 1. I am not sure how much you want to expose to an end user here. SQL 
> is declarative, and it does not specify how a query should be executed. I can 
> imagine that you might use different resources for different types of stages, 
> e.g. a scan stage and more compute heavy stages. This, IMO, should be based 
> on analysis and costing the plan. For this RDD only stage level scheduling 
> should be sufficient.
>
> > On Thu, Sep 29, 2022 at 8:56 AM Chenghao Lyu  wrote:
> > > Hi,
> > >
> > > I plan to deploy the stage-level scheduling for Spark SQL to apply some 
> > > fine-grained optimizations over the DAG of stages. However, I am blocked 
> > > by the following issues:
> > >
> > > 1. The current stage-level scheduling supports RDD APIs only. So is there 
> > > a way to reuse the stage-level scheduling for Spark SQL? E.g., how to 
> > > expose the RDD code (the transformations and actions) from a Spark SQL 
> > > (with SQL syntax)?
> > > 2. We do not quite understand why a Spark SQL could trigger multiple 
> > > jobs, and have some RDDs regenerated, as posted in here. Can anyone give 
> > > us some insight on the reasons and whether we can avoid the RDD 
> > > regeneration to save execution time?
> > >
> > > Thanks in advance.
> > >
> > > Cheers,
> > > Chenghao


Depolying stage-level scheduling for Spark SQL

2022-09-29 Thread Chenghao Lyu
Hi,

I plan to deploy the stage-level scheduling for Spark SQL to apply some 
fine-grained optimizations over the DAG of stages. However, I am blocked by the 
following issues:

1. The current stage-level scheduling supports RDD APIs only. So is there a way 
to reuse the stage-level scheduling for Spark SQL? E.g., how to expose the RDD 
code (the transformations and actions) from a Spark SQL (with SQL syntax)?
2. We do not quite understand why a Spark SQL could trigger multiple jobs, and 
have some RDDs regenerated, as posted in here. Can anyone give us some insight 
on the reasons and whether we can avoid the RDD regeneration to save execution 
time?

Thanks in advance.

Cheers,
Chenghao