Re: Depolying stage-level scheduling for Spark SQL

2022-10-03 Thread Tom Graves
 1) In my opinion this is to complex for the average user. In this case I'm 
assuming you have some sort of optimizer that would apply and do it 
automatically for the user?  If its just in the research stage of things can 
you just modify Spark to do experiments?
2) I think the main thing is having the heuristics and logic for changing what 
the user requested.  it sounds like you might be working on a component to do 
this but I didn't read the paper you pointed to yet either.
Also note there are already plugin points into Spark to add rules to optimizer 
and physical plan for columnar, it sounds to me you might be working on 
something that might fit better as a plugin if it automatically figures out 
what it thinks the best thing is.  If this is the case I go back to number 1 
above, can you modify spark to have the plugin point you need to do your 
experimentation to see if it makes sense.
TomOn Friday, September 30, 2022, 11:31:35 AM CDT, Chenghao Lyu 
 wrote:  
 
 Thanks for the clarification Tom!

A bit more backgrounds for what we want to do: we have proposed a fine-grained 
(stage-level) resource optimization approach in VLDB22 
https://www.vldb.org/pvldb/vol15/p3098-lyu.pdf and would like to try it over 
Spark. Our approach can recommend the resource configuration for each stage 
automatically (by using ML and our optimization framework), and we would like 
to see how to embed it in Spark. Initially, we consider that there is no AQE to 
make it simpler. 

Now I see the problem in two folds (In both cases, the stage-level 
configurations will be automatically configured by our algorithm with the the 
upper and lower bounds of each tunable resource given by a user):

(1) If AQE is disabled in Spark SQL, and hence the RDD DAG will not be changed 
after the physical plan is selected, do you think it is feasible and worth 
exposing the RDDs and reusing the existing stage-level scheduling API for 
optimization? 
(2) If AQE is enabled in Spark SQL, I would agree and prefer to add the 
stage-level resource optimization inside the AQE. Since I am not very 
experienced with the AQE part, would you list more potential challenges it may 
lead to? 

Thanks in advance and I would really appreciate it if you could give us more 
feedback!
Cheers, ChenghaoOn Sep 30, 2022, 4:22 PM +0200, Tom Graves 
, wrote:

see the original SPIP for as to why we only support RDD: 
https://issues.apache.org/jira/browse/SPARK-27495

The main problem is exactly what you are referring to. The RDD level is not 
exposed to the user when using SQL or Dataframe API. This is on purpose and 
user shouldn't have to know anything about the underlying impelementation using 
RDDs. Especially with AQE and other optimizations that could change things. You 
may start out with one physical plan and AQE can change it along the way, so 
how does user change RDD at that point?   It would be very difficult to expose 
this to the user and I don't think it should be.  I think we would have to come 
up with some other way to apply stage level scheduling to SQL/dataframe, or 
like mentioned in original issue if AQE gets smart enough it would just do it 
for the user, but lots of factors that come into play that make that difficult 
as well.
Tom On Friday, September 30, 2022, 04:15:36 AM CDT, Chenghao Lyu 
 wrote:

Thanks for the reply! 

To clarify, for issue 2, it could still break apart a query into multiple jobs 
without AQE — I have turned off the AQE in my posted example. 

For 1, an end user just needs to turn on/off a knob to use the stage-level 
scheduling for Spark SQL — I am considering adding a component between the 
Spark SQL module and the Spark Core model to optimize the stage-level resource. 

Yes, SQL is declarative. It uses a sequence of components (such as a logical 
planner, physical planner, and CBO) to get a selected physical plan. The RDDs 
(with the transformations) are generated based on the selected physical plan 
for execution. For now, we could only get the top-level RDD of the DAG of RDDs 
by `spark.sql(q1).queryExecution.toRdd`, but it is not enough to make 
stage-level scheduling decisions. The stage-level resources are profiled based 
on the RDDs. If we could expose the all RDDs instead of the top-level RDD, it 
seems possible to apply the stage-level scheduling here.


P.S. let me attach the link for the RDD regeneration explicitly in case it is 
not shown on the mail-list website: 
https://stackoverflow.com/questions/73895506/how-to-avoid-rdd-regeneration-in-spark-sql
Cheers,ChenghaoOn Sep 29, 2022, 5:22 PM +0200, Herman van Hovell 
, wrote:

I think issue 2 is caused by adaptive query execution. This will break apart 
queries into multiple jobs, each subsequent job will generate a RDD that is 
based on previous ones. 
As for 1. I am not sure how much you want to expose to an end user here. SQL is 
declarative, and it does not specify how a query should be executed. I can 
imagine that you might use different 

Re: Depolying stage-level scheduling for Spark SQL

2022-09-30 Thread Chenghao Lyu
Thanks for the clarification Tom!

A bit more backgrounds for what we want to do: we have proposed a fine-grained 
(stage-level) resource optimization approach in VLDB22 
https://www.vldb.org/pvldb/vol15/p3098-lyu.pdf and would like to try it over 
Spark. Our approach can recommend the resource configuration for each stage 
automatically (by using ML and our optimization framework), and we would like 
to see how to embed it in Spark. Initially, we consider that there is no AQE to 
make it simpler.

Now I see the problem in two folds (In both cases, the stage-level 
configurations will be automatically configured by our algorithm with the the 
upper and lower bounds of each tunable resource given by a user):

(1) If AQE is disabled in Spark SQL, and hence the RDD DAG will not be changed 
after the physical plan is selected, do you think it is feasible and worth 
exposing the RDDs and reusing the existing stage-level scheduling API for 
optimization?
(2) If AQE is enabled in Spark SQL, I would agree and prefer to add the 
stage-level resource optimization inside the AQE. Since I am not very 
experienced with the AQE part, would you list more potential challenges it may 
lead to?

Thanks in advance and I would really appreciate it if you could give us more 
feedback!

Cheers,
Chenghao
On Sep 30, 2022, 4:22 PM +0200, Tom Graves , 
wrote:
> see the original SPIP for as to why we only support RDD: 
> https://issues.apache.org/jira/browse/SPARK-27495
>
>
> The main problem is exactly what you are referring to. The RDD level is not 
> exposed to the user when using SQL or Dataframe API. This is on purpose and 
> user shouldn't have to know anything about the underlying impelementation 
> using RDDs. Especially with AQE and other optimizations that could change 
> things. You may start out with one physical plan and AQE can change it along 
> the way, so how does user change RDD at that point?   It would be very 
> difficult to expose this to the user and I don't think it should be.  I think 
> we would have to come up with some other way to apply stage level scheduling 
> to SQL/dataframe, or like mentioned in original issue if AQE gets smart 
> enough it would just do it for the user, but lots of factors that come into 
> play that make that difficult as well.
>
> Tom
> On Friday, September 30, 2022, 04:15:36 AM CDT, Chenghao Lyu 
>  wrote:
>
>
> Thanks for the reply!
>
> To clarify, for issue 2, it could still break apart a query into multiple 
> jobs without AQE — I have turned off the AQE in my posted example.
>
> For 1, an end user just needs to turn on/off a knob to use the stage-level 
> scheduling for Spark SQL — I am considering adding a component between the 
> Spark SQL module and the Spark Core model to optimize the stage-level 
> resource.
>
> Yes, SQL is declarative. It uses a sequence of components (such as a logical 
> planner, physical planner, and CBO) to get a selected physical plan. The RDDs 
> (with the transformations) are generated based on the selected physical plan 
> for execution. For now, we could only get the top-level RDD of the DAG of 
> RDDs by `spark.sql(q1).queryExecution.toRdd`, but it is not enough to make 
> stage-level scheduling decisions. The stage-level resources are profiled 
> based on the RDDs. If we could expose the all RDDs instead of the top-level 
> RDD, it seems possible to apply the stage-level scheduling here.
>
>
> P.S. let me attach the link for the RDD regeneration explicitly in case it is 
> not shown on the mail-list website: 
> https://stackoverflow.com/questions/73895506/how-to-avoid-rdd-regeneration-in-spark-sql
>
> Cheers,
> Chenghao
> On Sep 29, 2022, 5:22 PM +0200, Herman van Hovell , 
> wrote:
> > I think issue 2 is caused by adaptive query execution. This will break 
> > apart queries into multiple jobs, each subsequent job will generate a RDD 
> > that is based on previous ones.
> >
> > As for 1. I am not sure how much you want to expose to an end user here. 
> > SQL is declarative, and it does not specify how a query should be executed. 
> > I can imagine that you might use different resources for different types of 
> > stages, e.g. a scan stage and more compute heavy stages. This, IMO, should 
> > be based on analysis and costing the plan. For this RDD only stage level 
> > scheduling should be sufficient.
> >
> > On Thu, Sep 29, 2022 at 8:56 AM Chenghao Lyu  wrote:
> > > Hi,
> > >
> > > I plan to deploy the stage-level scheduling for Spark SQL to apply some 
> > > fine-grained optimizations over the DAG of stages. However, I am blocked 
> > > by the following issues:
> > >
> > > 1. The current stage-level scheduling supports RDD APIs only. So is there 
> > > a way to reuse the stage-level scheduling for Spark SQL? E.g., how to 
> > > expose the RDD code (the transformations and actions) from a Spark SQL 
> > > (with SQL syntax)?
> > > 2. We do not quite understand why a Spark SQL could trigger multiple 
> > > jobs, and have some RDDs 

Re: Depolying stage-level scheduling for Spark SQL

2022-09-30 Thread Tom Graves
 see the original SPIP for as to why we only support RDD: 
https://issues.apache.org/jira/browse/SPARK-27495

The main problem is exactly what you are referring to. The RDD level is not 
exposed to the user when using SQL or Dataframe API. This is on purpose and 
user shouldn't have to know anything about the underlying impelementation using 
RDDs. Especially with AQE and other optimizations that could change things. You 
may start out with one physical plan and AQE can change it along the way, so 
how does user change RDD at that point?   It would be very difficult to expose 
this to the user and I don't think it should be.  I think we would have to come 
up with some other way to apply stage level scheduling to SQL/dataframe, or 
like mentioned in original issue if AQE gets smart enough it would just do it 
for the user, but lots of factors that come into play that make that difficult 
as well.
Tom On Friday, September 30, 2022, 04:15:36 AM CDT, Chenghao Lyu 
 wrote:  
 
 Thanks for the reply! 

To clarify, for issue 2, it could still break apart a query into multiple jobs 
without AQE — I have turned off the AQE in my posted example. 

For 1, an end user just needs to turn on/off a knob to use the stage-level 
scheduling for Spark SQL — I am considering adding a component between the 
Spark SQL module and the Spark Core model to optimize the stage-level resource. 

Yes, SQL is declarative. It uses a sequence of components (such as a logical 
planner, physical planner, and CBO) to get a selected physical plan. The RDDs 
(with the transformations) are generated based on the selected physical plan 
for execution. For now, we could only get the top-level RDD of the DAG of RDDs 
by `spark.sql(q1).queryExecution.toRdd`, but it is not enough to make 
stage-level scheduling decisions. The stage-level resources are profiled based 
on the RDDs. If we could expose the all RDDs instead of the top-level RDD, it 
seems possible to apply the stage-level scheduling here.


P.S. let me attach the link for the RDD regeneration explicitly in case it is 
not shown on the mail-list website: 
https://stackoverflow.com/questions/73895506/how-to-avoid-rdd-regeneration-in-spark-sql
Cheers,ChenghaoOn Sep 29, 2022, 5:22 PM +0200, Herman van Hovell 
, wrote:

I think issue 2 is caused by adaptive query execution. This will break apart 
queries into multiple jobs, each subsequent job will generate a RDD that is 
based on previous ones. 
As for 1. I am not sure how much you want to expose to an end user here. SQL is 
declarative, and it does not specify how a query should be executed. I can 
imagine that you might use different resources for different types of stages, 
e.g. a scan stage and more compute heavy stages. This, IMO, should be based on 
analysis and costing the plan. For this RDD only stage level scheduling should 
be sufficient.
On Thu, Sep 29, 2022 at 8:56 AM Chenghao Lyu  wrote:

Hi, 

I plan to deploy the stage-level scheduling for Spark SQL to apply some 
fine-grained optimizations over the DAG of stages. However, I am blocked by the 
following issues:   
   - The current stage-level scheduling supports RDD APIs only. So is there a 
way to reuse the stage-level scheduling for Spark SQL? E.g., how to expose the 
RDD code (the transformations and actions) from a Spark SQL (with SQL syntax)?
   - We do not quite understand why a Spark SQL could trigger multiple jobs, 
and have some RDDs regenerated, as posted in here. Can anyone give us some 
insight on the reasons and whether we can avoid the RDD regeneration to save 
execution time? 
Thanks in advance.
Cheers, Chenghao

  

Re: Depolying stage-level scheduling for Spark SQL

2022-09-30 Thread Chenghao Lyu
Thanks for the reply!

To clarify, for issue 2, it could still break apart a query into multiple jobs 
without AQE — I have turned off the AQE in my posted example.

For 1, an end user just needs to turn on/off a knob to use the stage-level 
scheduling for Spark SQL — I am considering adding a component between the 
Spark SQL module and the Spark Core model to optimize the stage-level resource.

Yes, SQL is declarative. It uses a sequence of components (such as a logical 
planner, physical planner, and CBO) to get a selected physical plan. The RDDs 
(with the transformations) are generated based on the selected physical plan 
for execution. For now, we could only get the top-level RDD of the DAG of RDDs 
by `spark.sql(q1).queryExecution.toRdd`, but it is not enough to make 
stage-level scheduling decisions. The stage-level resources are profiled based 
on the RDDs. If we could expose the all RDDs instead of the top-level RDD, it 
seems possible to apply the stage-level scheduling here.


P.S. let me attach the link for the RDD regeneration explicitly in case it is 
not shown on the mail-list website: 
https://stackoverflow.com/questions/73895506/how-to-avoid-rdd-regeneration-in-spark-sql

Cheers,
Chenghao
On Sep 29, 2022, 5:22 PM +0200, Herman van Hovell , 
wrote:
> I think issue 2 is caused by adaptive query execution. This will break apart 
> queries into multiple jobs, each subsequent job will generate a RDD that is 
> based on previous ones.
>
> As for 1. I am not sure how much you want to expose to an end user here. SQL 
> is declarative, and it does not specify how a query should be executed. I can 
> imagine that you might use different resources for different types of stages, 
> e.g. a scan stage and more compute heavy stages. This, IMO, should be based 
> on analysis and costing the plan. For this RDD only stage level scheduling 
> should be sufficient.
>
> > On Thu, Sep 29, 2022 at 8:56 AM Chenghao Lyu  wrote:
> > > Hi,
> > >
> > > I plan to deploy the stage-level scheduling for Spark SQL to apply some 
> > > fine-grained optimizations over the DAG of stages. However, I am blocked 
> > > by the following issues:
> > >
> > > 1. The current stage-level scheduling supports RDD APIs only. So is there 
> > > a way to reuse the stage-level scheduling for Spark SQL? E.g., how to 
> > > expose the RDD code (the transformations and actions) from a Spark SQL 
> > > (with SQL syntax)?
> > > 2. We do not quite understand why a Spark SQL could trigger multiple 
> > > jobs, and have some RDDs regenerated, as posted in here. Can anyone give 
> > > us some insight on the reasons and whether we can avoid the RDD 
> > > regeneration to save execution time?
> > >
> > > Thanks in advance.
> > >
> > > Cheers,
> > > Chenghao


Re: Depolying stage-level scheduling for Spark SQL

2022-09-29 Thread Herman van Hovell
I think issue 2 is caused by adaptive query execution. This will break
apart queries into multiple jobs, each subsequent job will generate a RDD
that is based on previous ones.

As for 1. I am not sure how much you want to expose to an end user here.
SQL is declarative, and it does not specify how a query should be executed.
I can imagine that you might use different resources for different types of
stages, e.g. a scan stage and more compute heavy stages. This, IMO, should
be based on analysis and costing the plan. For this RDD only stage level
scheduling should be sufficient.

On Thu, Sep 29, 2022 at 8:56 AM Chenghao Lyu  wrote:

> Hi,
>
> I plan to deploy the stage-level scheduling for Spark SQL to apply some
> fine-grained optimizations over the DAG of stages. However, I am blocked by
> the following issues:
>
>1. The current stage-level scheduling
>
> 
>  supports
>RDD APIs only. So is there a way to reuse the stage-level scheduling for
>Spark SQL? E.g., how to expose the RDD code (the transformations and
>actions) from a Spark SQL (with SQL syntax)?
>2. We do not quite understand why a Spark SQL could trigger multiple
>jobs, and have some RDDs regenerated, as posted in *here*
>
> 
>. Can anyone give us some insight on the reasons and whether we can
>avoid the RDD regeneration to save execution time?
>
> Thanks in advance.
>
> Cheers,
> Chenghao
>


Depolying stage-level scheduling for Spark SQL

2022-09-29 Thread Chenghao Lyu
Hi,

I plan to deploy the stage-level scheduling for Spark SQL to apply some 
fine-grained optimizations over the DAG of stages. However, I am blocked by the 
following issues:

1. The current stage-level scheduling supports RDD APIs only. So is there a way 
to reuse the stage-level scheduling for Spark SQL? E.g., how to expose the RDD 
code (the transformations and actions) from a Spark SQL (with SQL syntax)?
2. We do not quite understand why a Spark SQL could trigger multiple jobs, and 
have some RDDs regenerated, as posted in here. Can anyone give us some insight 
on the reasons and whether we can avoid the RDD regeneration to save execution 
time?

Thanks in advance.

Cheers,
Chenghao