This task_sla is more and more making me think of a ‘task’ on its own. It would need to be run in parallel, non blocking, not overlap between each other, etc…
How hard would it be to spawn them when a task run with SLA configured as a normal workload on the worker ? Maybe on a dedicated queue / worker ? On Tue 20 Jun 2023 at 16:47, Sung Yun <sy...@cornell.edu> wrote: > Thank you all for your continued engagement and input! It looks like > Iaroslav's layout of 3 different labels of SLA's is helping us group the > implementation into different categories, so I will organize my own > responses in those logical groupings as well. > > 1. dag_sla > 2. task_sla > 3. task: time_limit_sla > > 1. dag_sla > I am going to lean in on Jarek's support in driving us to agree on the fact > that, dag_sla seems like the only one that can stay within the scheduler > without incurring an excessive burden on the core infrastructure. > > > So, I totally agree about dag level slas. It's very important to have it > and according to Sung Yun proposal it should be implemented not on the > scheduler job level. > > In response to this, I want to clarify that I am specifically highlighting > that dag_sla is the only one that can be supported by the scheduler job. > Dag_sla isn't a feature that exists right now, and my submission proposes > exactly this! > > 2. task_sla > I think Utkarsh's response really helped highlight another compounding > issue with SLAs in Airflow, which is that users have such varying > definition of SLAs, and what they want to do when that SLA is breached. > On a high level, task_sla relies on a relationship between the dag_run, and > a specific task within that specific dag_run: it is the time between a > dag_run's scheduled start time, and the actual start or end time of an > individual task within that run. > Hence, it is impossible for it to be computed in a distributed way that > address all of the issues highlighted in the AIP, and needs to be managed > by a central process that has access to the single source of truth. > As Utkarsh suggests, I think this is perhaps doable as a separate process, > and probably would be much safer to do it within a separate process. > My only concern is that we would be introducing a separate Airflow process, > that is strictly optional, but one that requires quite a large amount of > investment in designing the right abstractions to meet user satisfaction > and reliability guarantees. > It will also require us to review the database's dag/dag_run/task tables' > indexing model to make sure that continuous queries to the database will > not overload it. > This isn't simple, because we will have to select tasks in any state > (FAILED, SUCCESS or RUNNING) that has not yet had their SLA evaluated, from > any dagRun (FAILED, SUCCESS or RUNNING), in order to make sure we don't > miss any tasks - because in this paradigm, the concept of SLA triggering is > decoupled from a dagrun or task execution. > A query that selects tasks in ANY state in ANY state of dag_run is bound to > be incredibly expensive - and I discuss this challenge in the Confluence > AIP and the Google Doc. > This will possibly be even more difficult to achieve, because we should > have the capacity to support multiple processes since we now support High > Availability in Airflow. > So although setting up a separate process decouples the SLA evaluation from > the scheduler, we need to acknowledge that we may be introducing a heavy > dependency on the metadata database. > > My suggestion to leverage the existing Triggerer process to design > monitoring Deferrable Operators to execute SLA callbacks has the benefit of > reducing the load on the database while achieving similar goals, because it > registers the SLA monitoring operator as a TASK to the dag_run that it is > associated with, and prevents the dag_run from completing if the SLA has > not yet computed. This means that our query will be strictly limited to > just the dagRuns in RUNNING state - this is a HUGE difference from having > to query dagruns in all states in a separate process, because we are merely > attaching a few additional tasks to be executed into existing dag_runs. > > In summary: I'm open to this idea, I just have not been able to think of a > way to manage this without overloading the scheduler, or the database. > > 3. task: time_limit_sla > Jarek: That sounds like a great idea that we could group into this AIP - I > will make some time to add some code snippets into the AIP to make this > idea a bit clearer to everyone reading it in preparation for the vote > > > Sung > > On Sun, Jun 18, 2023 at 9:38 PM utkarsh sharma <utkarshar...@gmail.com> > wrote: > > > > > > > This can be IMHO implemented on the task level. We currently have > timeout > > > implemented this way - whenever we start the task, we can have a signal > > > handler registered with "real" time registered that will cancel the > task. > > > But I can imagine similar approach with signal and propagate the > > > information that task exceeded the time it has been allocated but would > > not > > > stop it, just propagate the information (in a form of current way we do > > > callbacks for example, or maybe (even better) only run it in the > context > > of > > > task to signal "soft timeout" per task: > > > > > > > signal.signal(signal.SIGALRM, self.handle_timeout) > > > > signal.setitimer(signal.ITIMER_REAL, self.seconds) > > > > > > This has an advantage that it is fully distributed - i.e. we do not > need > > > anything to monitor 1000s of tasks running to decide if SLA has been > > > breached. It's the task itself that will get the "soft" timeout and > > > propagate it (and then whoever receives the callback can decide what to > > do > > > next - and this "callback" can happen in either the task context or it > > > could be done in a DagFileProcessor context as we do currently - though > > the > > > in-task processing seems much more distributed and scalable in nature. > > > There is one watch-out here that this is not **guaranteed** to work, > > there > > > are cases, that we already saw that the SIGALRM is not going to be > > handled > > > locally, when the task uses long running C-level function that is not > > > written in the way to react to signals generated in Python (thinks > > > low-level long-running Pandas c-method call that does not check signal > > in a > > > long-running-loop. That however probably could be handled by one more > > > process fork and have a dedicated child process that would monitor > > running > > > tasks from a separate process - and we could actually improve both > > timeout > > > and SLA handling by introducing such extra forked process to handle > > > timeout/task level time_limit_sla, so IMHO this is an opportunity to > > > improve things. > > > > > > > > > Building on what Jarek mentioned, If we can enable the scheduler to emit > > events for DAGs with SLA configured in cases of > > 1. DAG starts executing > > 2. Task - start executing(for every task) > > 3. Task - stop executing(for every task) > > 4. DAG stops executing > > > > And have a separate process(per dag run) that can keep monitoring such > > events and execute a callback in the following circumstances: > > 1. DAG level SLA miss > > - When the entire DAG didn't finish in a specific time > > 2. Task-level SLA miss > > - Counting time from the start of the DAG to the end of a task. > > - Start of a task to end of a task. > > > > I think above approch should be addressing issues listed in AIP-57 > > < > > > https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-57+SLA+as+a+DAG-Level+Feature > > > > > 1. Since we have a septate process we no longer have to wait for the > > Tasks/DAG to be in the SUCCESS/SKIPPED state or any other terminal state. > > In this process, we can have a loop executing periodically in intervals > of > > ex- 1sec to monitor SLA misses by monitoring events data and > task_instance > > table. > > 2. For Manually/Dataset triggered dags, we no longer have a dependency > on a > > fixed schedule, everything we need to evaluate SLA miss is already > present > > in Events for that specific DAG. > > 3. This approach also enables us to run callbacks on the task level. > > 4. We can remove calls to sla_miss_callbacks every time is called > > *dag_run.update_state* > > > > A couple of things I'm not about are - > > 1. Where to execute the callbacks. Executing a callback in the same > process > > as the monitoring process can have a downside if the callback takes much > > time to execute it will probably cause other SLA callbacks to be delayed. > > 2. Context of execution of callback, we have to maintain the same context > > in which the callback is defined. > > > > Would love to know other people's thoughts on this :) > > > > Thanks, > > Utkarsh Sharma > > > > > > On Sun, Jun 18, 2023 at 4:08 PM Iaroslav Poskriakov < > > yaroslavposkrya...@gmail.com> wrote: > > > > > I want to say that airflow is a very popular project and the ways of > > > calculating SLA are different. Because of different business cases. And > > if > > > it's possible we should make most of them from the box. > > > > > > вс, 18 июн. 2023 г. в 13:30, Iaroslav Poskriakov < > > > yaroslavposkrya...@gmail.com>: > > > > > > > So, I totally agree about dag level slas. It's very important to have > > it > > > > and according to Sung Yun proposal it should be implemented not on > the > > > > scheduler job level. > > > > > > > > Regarding the second way of determining SLA: <task state STARTED> --> > > > > ..<doesn't matter what happened>.. --> <task state SUCCESS>. > > > > It's very helpful in the way when we want to achieve not technical > SLA > > > but > > > > business SLA for the team which is using that DAG. Because between > > those > > > > two states anything could happen and at the end we might want to > > > understand > > > > high level SLA for the task. Because it doesn't matter for business I > > > guess > > > > that path of states of the task was something like: STARTED -> > RUNNING > > -> > > > > FAILED -> RUNNING -> FAILED -> RUNNING -> SUCCESS. And in case when > > > > something similar is happening it can be helpful to have an > opportunity > > > of automatically > > > > recognizing that the expected time for the task crossed the border. > > > > > > > > I agree that for the scheduler it can be too heavy. And also for that > > > > purpose we need to have some process which is running in parallel > with > > > the > > > > task. It can be one more job for example which is running on the same > > > > machine as Scheduler, or not on the same. > > > > > > > > > > > > About the third part of my proposal - time for the task in the > > > > RUNNING state. I agree with you, Jarek. We can implement it on the > task > > > > level. For me it seems good. > > > > > > > > Yaro1 > > > > > > > > вс, 18 июн. 2023 г. в 08:12, Jarek Potiuk <ja...@potiuk.com>: > > > > > > > >> I am also for DAG level SLA only (but maybe there are some twists). > > > >> > > > >> And I hope (since Sung Yun has not given up on that) - maybe that is > > the > > > >> right time that others here will chime in and maybe it will let the > > vote > > > >> go > > > >> on? I think it would be great to get the SLA feature sorted out so > > that > > > we > > > >> have a chance to stop answering ("yeah, we know SLA is broken, it > has > > > >> always been"). It would be nice to say "yeah the old deprecated SLA > is > > > >> broken, but we have this new mechanism(s) that replace it". The one > > > >> proposed by Sung has a good chance of being such a replacement. > > > >> > > > >> I think having a task-level SLA managed by the Airflow framework > might > > > >> indeed be too costly and does not fit well in the current > > architecture. > > > I > > > >> think attempting to monitor how long a given task runs by the > > scheduler > > > is > > > >> simply a huge overkill. Generally speaking - scheduler (as > surprising > > it > > > >> might be for anyone) does not monitor executing tasks (at least > > > >> principally > > > >> speaking). It merely submits the tasks to execute to executor and > let > > > >> executor handle all kinds of monitoring of what is being executed > > when, > > > >> and > > > >> then - depending on the different types of executors there are > various > > > >> conditions when and how task is being executed, and various ways how > > you > > > >> can define different kinds of task SLAs. Or at least this is how I > > think > > > >> about the distributed nature of Airflow on a "logical" level. Once > > task > > > is > > > >> queued for execution, the scheduler takes its hands off and turns > its > > > >> attention to tasks that are not yet scheduled and should be or tasks > > > that > > > >> are scheduled but not queued yet. > > > >> > > > >> But maybe some of the SLA "task" expectations can be implemented > in a > > > >> limited version serving very limited cases on a task level? > > > >> > > > >> Referring to what Yaro1 wrote: > > > >> > > > >> > 1. It doesn't matter for us how long we are spending time on some > > > >> specific > > > >> task. It's important to have an understanding of the lag between > > > >> execution_date of dag and success state for the task. We can call it > > > >> dag_sla. It's similar to the current implementation of manage_slas. > > > >> > > > >> This is basically what Sung proposes, I believe. > > > >> > > > >> > > > >> > 2. It's important to have an understanding and managing how long > > some > > > >> specific task is working. In my opinion working is the state between > > > task > > > >> last start_date and task first (after last start_date) SUCCESS > state. > > So > > > >> for example for the task which is placed in FAILED state we still > have > > > to > > > >> check an SLA in that strategy. We can call it task_sla. > > > >> > > > >> I am not sure if I understand it, but If I do, then this is the > "super > > > >> costly" SLA processing that we should likely avoid. I would love to > > hear > > > >> however, what are some specific use cases that we could show here, > > maybe > > > >> there are other ways we can achieve similar things. > > > >> > > > >> > > > >> > 3. Sometimes we need to manage time for the task in the RUNNING > > state. > > > >> We > > > >> can call it time_limit_sla. > > > >> > > > >> This can be IMHO implemented on the task level. We currently have > > > timeout > > > >> implemented this way - whenever we start the task, we can have a > > signal > > > >> handler registered with "real" time registered that will cancel the > > > task. > > > >> But I can imagine similar approach with signal and propagate the > > > >> information that task exceeded the time it has been allocated but > > would > > > >> not > > > >> stop it, just propagate the information (in a form of current way we > > do > > > >> callbacks for example, or maybe (even better) only run it in the > > context > > > >> of > > > >> task to signal "soft timeout" per task: > > > >> > > > >> > signal.signal(signal.SIGALRM, self.handle_timeout) > > > >> > signal.setitimer(signal.ITIMER_REAL, self.seconds) > > > >> > > > >> This has an advantage that it is fully distributed - i.e. we do not > > need > > > >> anything to monitor 1000s of tasks running to decide if SLA has been > > > >> breached. It's the task itself that will get the "soft" timeout and > > > >> propagate it (and then whoever receives the callback can decide what > > to > > > do > > > >> next - and this "callback" can happen in either the task context or > it > > > >> could be done in a DagFileProcessor context as we do currently - > > though > > > >> the > > > >> in-task processing seems much more distributed and scalable in > nature. > > > >> There is one watch-out here that this is not **guaranteed** to work, > > > there > > > >> are cases, that we already saw that the SIGALRM is not going to be > > > handled > > > >> locally, when the task uses long running C-level function that is > not > > > >> written in the way to react to signals generated in Python (thinks > > > >> low-level long-running Pandas c-method call that does not check > signal > > > in > > > >> a > > > >> long-running-loop. That however probably could be handled by one > more > > > >> process fork and have a dedicated child process that would monitor > > > running > > > >> tasks from a separate process - and we could actually improve both > > > timeout > > > >> and SLA handling by introducing such extra forked process to handle > > > >> timeout/task level time_limit_sla, so IMHO this is an opportunity to > > > >> improve things. > > > >> > > > >> I would love to hear what others think about it :)? I think our SLA > > for > > > >> fixing SLA is about to run out. > > > >> > > > >> > > > >> J. > > > >> > > > >> > > > >> On Thu, Jun 15, 2023 at 4:05 PM Sung Yun <sy...@cornell.edu> wrote: > > > >> > > > >> > Hello! > > > >> > > > > >> > Thank you very much for the feedback on the proposal. I’ve been > > hoping > > > >> to > > > >> > get some more traction on this proposal, so it’s great to hear > from > > > >> another > > > >> > user of the feature. > > > >> > > > > >> > I understand that there’s a lot of support for keeping a native > task > > > >> level > > > >> > SLA feature, and I definitely agree with that sentiment. Our > > > >> organization > > > >> > very much relies on Airflow to evaluate ‘task_sla’ in order to > keep > > > >> track > > > >> > of which tasks in each dags failed to succeed by an expected time. > > > >> > > > > >> > In the AIP I put together on the Confluence page, and in the > Google > > > >> docs, > > > >> > I have identified why the existing implementation of the task > level > > > SLA > > > >> > feature can be problematic and is often misleading for Airflow > > users. > > > >> The > > > >> > feature is also quite costly for Airflow scheduler and > > dag_processor. > > > >> > > > > >> > In that sense, the discussion is not about whether or not these > SLA > > > >> > features are important to the users, but much more technical. Can > a > > > >> > task-level feature be supported in a first-class way as a core > > feature > > > >> of > > > >> > Airflow, or should it be implemented by the users, for example as > > > >> > independent tasks by leveraging Deferrable Operators. > > > >> > > > > >> > My current thought is that only Dag level SLAs can be supported > in a > > > >> > non-disruptive way by the scheduler, and that task level SLAs > should > > > be > > > >> > handled outside of core Airflow infrastructure code. If you > strongly > > > >> > believe otherwise, I think it would be helpful if you could > propose > > an > > > >> > alternative technical solution that solves many of the existing > > > >> problems in > > > >> > the task-level SLA feature. > > > >> > > > > >> > Sent from my iPhone > > > >> > > > > >> > > On Jun 13, 2023, at 1:10 PM, Ярослав Поскряков < > > > >> > yaroslavposkrya...@gmail.com> wrote: > > > >> > > > > > >> > > Mechanism of SLA > > > >> > > > > > >> > > Hi, I read the previous conversation regarding SLA and I think > > > >> removing > > > >> > the > > > >> > > opportunity to set sla for the task level will be a big mistake. > > > >> > > So, the proposed implementation of the task level SLA will not > be > > > >> working > > > >> > > correctly. > > > >> > > > > > >> > > That's why I guess we have to think about the mechanism of using > > > SLA. > > > >> > > > > > >> > > I guess we should check three different cases in general. > > > >> > > > > > >> > > > > > >> > > 1. It doesn't matter for us how long we are spending time on > some > > > >> > specific > > > >> > > task. It's important to have an understanding of the lag between > > > >> > > execution_date of dag and success state for the task. We can > call > > it > > > >> > > dag_sla. It's similar to the current implementation of > > manage_slas. > > > >> > > > > > >> > > > > > >> > > 2. It's important to have an understanding and managing how long > > > some > > > >> > > specific task is working. In my opinion working is the state > > between > > > >> task > > > >> > > last start_date and task first (after last start_date) SUCCESS > > > state. > > > >> So > > > >> > > for example for the task which is placed in FAILED state we > still > > > >> have to > > > >> > > check an SLA in that strategy. We can call it task_sla. > > > >> > > > > > >> > > > > > >> > > 3. Sometimes we need to manage time for the task in the RUNNING > > > >> state. We > > > >> > > can call it time_limit_sla. > > > >> > > > > > >> > > > > > >> > > Those three types of SLA will cover all possible cases. > > > >> > > > > > >> > > > > > >> > > So we will have three different strategies for SLA. > > > >> > > > > > >> > > > > > >> > > I guess we can use for dag_sla that idea - > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals > > > >> > > > > > >> > > > > > >> > > For task_sla and time_limit_sla I prefer to stay with using > > > >> SchedulerJob > > > >> > > > > > >> > > > > > >> > > Github: Yaro1 > > > >> > > > > >> > > > --------------------------------------------------------------------- > > > >> > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org > > > >> > For additional commands, e-mail: dev-h...@airflow.apache.org > > > >> > > > > >> > > > > >> > > > > > > > > > > > > -- > Sung Yun > Cornell Tech '20 > Master of Engineering in Computer Science >