Also - not sure if you are subscribed to the devlist - so I will add your direct address here so that you can for sure see the answer (and if you are not subscribed, then by all means - do subscribe).
On Sun, Mar 10, 2024 at 12:01 PM Jarek Potiuk <ja...@potiuk.com> wrote: > I think before writing AIP in confluence, I would encourage you to try to > describe your idea in a shared google docs document and explain it. But > before you do that - I'd encourage you to take a close look and deep dive > into implementation of priorities. It might be different than you think, it > has priority weight algorithms that allow for inclusion of > downstream/upstream task priorities, also since the way how airflow > serializes the tasks, they are re-reread and refreshed by Airflow every 30 > seconds by default, so whatever priority_weights you set in DAGs will > override the priorities that you **might** want to set via external API. > > Note that even today - tasks do not have "priorities" per se. They have > "priority_weight" and "weight_rule" - that is used to automatically > determine what's the actual priority of the task based on those rules. So > there is not a single "priority" you can override, there is a set of > database queries to calculate those when tasks are eligible for execution, > and you cannot simply "set" priority for the task this way. > > But there is more fundamental problem with the proposal - this proposal > seems to validate a basic principle that we have in Airflow - that tasks > and their behaviour is entirely defined by DAG authors who have access to > the DAG folder and can change the task definition. See > https://airflow.apache.org/docs/apache-airflow/stable/security/security_model.html > - UI users (so also API users) - by definition cannot CHANGE DAG and task > definitions. This is by design. They can run/rerun/clear tasks defined by > DAG Authors - and it's the DAG authors that have ultimate influence on the > definition of tasks. If you look very closely at the API, you will find > that there is not a single API there that allows you to modify existing > task definitions. Not a single one. > > The changes ALWAYS come from the DAG folder. No exception > > So what you are proposing here is way more than just changing "a priority" > of the task - you are proposing change in a fundamental assumption that > Airflow takes - that authors of DAG are the only ones who can change it. > Now - someone else will be able to change the task definition. Someone who > is not a DAG author. And can change it independently from task definition. > > And it has far reaching consequences. For example we are just discussing a > whole series of changes about dag Versioning: > > * > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-63%3A+DAG+Versioning > * > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-64%3A+Keep+TaskInstance+try+history > * > https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-66%3A+Execution+of+specific+DAG+versions > > All those series of changes build on the assumption that task definition > comes via changes in the DAG folder. They will not handle case where task > definition can also be changed - independently - via other mechanisms, such > an API call. Breaking the assumption will make the whole versioning way > harder, or maybe even impossible. > > So I am not sure that the way you described your proposal is correct and > implementation of it should not do what you propose. Maybe you should > consider a different approach if you would like to change priorities of > tasks (note that priorities of tasks are used when executor decides which > of the tasks eligible for execution should be turned from queued , to > running and should be given an execution slot). > > I think you have two ways, if you want to proceed with your idea: > > 1) Implement it by using the fact that Airflow DAGs are Python code. If > you **really** want to permanently change priorities of tasks, you could > simply write your DAGs in a specific way to use some variables (for example > coming from local json file) as priorities and read it from there - and > then, rather than making an API call to airflow webserver, you could change > the priorities directly by changing priorities stored in those JSON files > in the DAG folders. You could also directly modify priorities in the Python > code as well - that's a bit more complex, but should also be possible. This > is simple. Does not require to implement new features in Airflow, does not > interfere with Airflow's security model and basic assumptions we have for > DAG definition, does not have long-term effect on things like DAG > versioning. > > 2) Maybe what you are after is to add a completely different mechanism to > decide on priorities - currently this mechanism uses priority_weights > stored by task and priority weight rules defined in DAG/task definition and > uses it to calculate the actual priority used when the executor decides > which tasks should be picked for execution. This would be a completely new > feature that would have to be carefully designed and implemented - also > including the fact that we are just in the middle of implementing > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-61+Hybrid+Execution. > - hybrid executors and also we are discussing multi-tenancy proposal that > builds on the way how hybrid executors will be working > https://cwiki.apache.org/confluence/display/AIRFLOW/%5BDRAFT%5D+AIP-67+Multi-tenant+deployment+of+Airflow+components > - especially in the context of possibly starving tenants by other tenants. > But this is a far more complex task than "set priority for task". > > 3) Probably what you are proposing might actually be solved in a different > way. We have an open proposal from Hussein-Awala to make priority weight > calculation flexible - to the point that you could provide your own > priority weight calculation rules configured via plugins in Airflow. That > seems like a much better way to think about customising priorities, because > the rule can be provided at the task definition and the rule could also add > a way to provide an "override" for the priority. The big difference here vs > API and setting priority per task from outside, is that decision can be > made not when the API call is made but when priorities are calculated in > the airflow scheduler. That seems to be a much better place to decide about > priorities and does not involve changing task definition. > > I'd encourage you to take a close look at what I wrote and deeper dive > into the way priority calculation works. > > Also what might be very useful for you is to take a look at other > discussions and PRs we are doing around priority weights at this moment: > > * The Proposal from Hussein-Awala I mentioned, to make priority > calculation flexible - https://github.com/apache/airflow/pull/36029 > * PR from Andrey about priority weight validation: > https://github.com/apache/airflow/pull/37990 > > I do not want to discourage you from your quest, but I think that while > direction you want to go is a noble one, doing it in the way you proposed > likely does not take into account some of the basic assumptions and how it > impacts on-going work and AIPs being worked on in Airflow. > > J. > > > > On Tue, Feb 27, 2024 at 4:51 PM Alvaro Serrano <alvaroserper2...@gmail.com> > wrote: > >> Hi, >> >> My name is Alvaro and I am a junior developer who is writing his FMP >> (Final >> master project) about a project which uses Apache Airflow. I am interested >> in developing a new feature for Airflow which I think many people could >> use. >> >> MOTIVATION >> I really think this is a very useful feature to add. There are times when >> you want to make a dag execution and you expect that execution to overtake >> the others already running, because it has priority over them. Now, this >> is >> not possible, because the tasks priority are static and only added when >> creating a dag, not when doing a dag run. >> >> CONSIDERATIONS >> I propose to add a parameter to the dag run API and Client endpoint named >> as 'priority'. This parameter will overwrite the tasks priority of the new >> dag run. Then, the scheduler will do the work as it does now. >> This will remove the need of creating several dags which do the same but >> with different priority parameters. >> Users are not affected by this change, because it is a change that adds an >> optional parameter that can be used or not. >> >> I will extend the AIP proposal when I get access to the Confluence AIP >> Draft. >> >> Thanks in advance. >> >