> Which forking strategy are we exactly proposing? The important part is that you have a separate process that will run a separate Python interpreter so that if the task runs a "C" code without a loop, the "timer" thread will be able to stop it regardless (for timeout) and one that can run "in-parallel" SLA. So lillely it is
local task | - timeout fork (kills both "chlidren" if fired) | - sla timer (runs in parallel to task) | - task code Then when SLA timer fires, it will just notify - but let the task_code run. When timeout fires it will kill both child processes. J. On Wed, Jun 21, 2023 at 9:22 PM Sung Yun <sy...@cornell.edu> wrote: > Hi Jarek, I've been mulling over the implementation of (3) task: > time_limit_sla, and I have some follow up questions about the > implementation. > > Which forking strategy are we exactly proposing? Currently, we invoke > task.execute_callable within the taskinstance, which we can effectively > think of as the parent process for the sake of this discussion. > > Are we proposing: > Structure 1 > parent: task.execute_callable > └ child 1: sla timer > └ child 2: execution_timeout timer > > Or: > Structure 2 > parent: looping process that parses signals from child Processes > └ child 1: sla timer > └ child 2: execution_timeout timer > └ child 3: task.execute_callable > > And also, are we proposing that the callbacks be executed in the child > processes (when the timers complete) or in the parent process? > > Pierre: great questions... > > > How hard would it be to spawn them when a task run with SLA configured as > a > normal workload on the worker ? > > Maybe on a dedicated queue / worker ? > > My current thought is that having a well-abstracted subclass implementation > of Deferrable Operator may make the most sense for now. I worry that having > a configuration-driven way of creating sla monitoring tasks, where they are > created behind the scenes, would create confusion in the user base. > Especially so, if there is no dedicated worker pool that will completely > isolate the monitoring tasks from the resource pool of normal tasks. So I'm > curious to hear what options we would have in setting up a dedicated worker > pool to compliment this idea. > > Sung > > On Tue, Jun 20, 2023 at 2:08 PM Pierre Jeambrun <pierrejb...@gmail.com> > wrote: > > > This task_sla is more and more making me think of a ‘task’ on its own. It > > would need to be run in parallel, non blocking, not overlap between each > > other, etc… > > > > How hard would it be to spawn them when a task run with SLA configured > as a > > normal workload on the worker ? > > Maybe on a dedicated queue / worker ? > > > > On Tue 20 Jun 2023 at 16:47, Sung Yun <sy...@cornell.edu> wrote: > > > > > Thank you all for your continued engagement and input! It looks like > > > Iaroslav's layout of 3 different labels of SLA's is helping us group > the > > > implementation into different categories, so I will organize my own > > > responses in those logical groupings as well. > > > > > > 1. dag_sla > > > 2. task_sla > > > 3. task: time_limit_sla > > > > > > 1. dag_sla > > > I am going to lean in on Jarek's support in driving us to agree on the > > fact > > > that, dag_sla seems like the only one that can stay within the > scheduler > > > without incurring an excessive burden on the core infrastructure. > > > > > > > So, I totally agree about dag level slas. It's very important to have > > it > > > and according to Sung Yun proposal it should be implemented not on the > > > scheduler job level. > > > > > > In response to this, I want to clarify that I am specifically > > highlighting > > > that dag_sla is the only one that can be supported by the scheduler > job. > > > Dag_sla isn't a feature that exists right now, and my submission > proposes > > > exactly this! > > > > > > 2. task_sla > > > I think Utkarsh's response really helped highlight another compounding > > > issue with SLAs in Airflow, which is that users have such varying > > > definition of SLAs, and what they want to do when that SLA is breached. > > > On a high level, task_sla relies on a relationship between the dag_run, > > and > > > a specific task within that specific dag_run: it is the time between a > > > dag_run's scheduled start time, and the actual start or end time of an > > > individual task within that run. > > > Hence, it is impossible for it to be computed in a distributed way that > > > address all of the issues highlighted in the AIP, and needs to be > managed > > > by a central process that has access to the single source of truth. > > > As Utkarsh suggests, I think this is perhaps doable as a separate > > process, > > > and probably would be much safer to do it within a separate process. > > > My only concern is that we would be introducing a separate Airflow > > process, > > > that is strictly optional, but one that requires quite a large amount > of > > > investment in designing the right abstractions to meet user > satisfaction > > > and reliability guarantees. > > > It will also require us to review the database's dag/dag_run/task > tables' > > > indexing model to make sure that continuous queries to the database > will > > > not overload it. > > > This isn't simple, because we will have to select tasks in any state > > > (FAILED, SUCCESS or RUNNING) that has not yet had their SLA evaluated, > > from > > > any dagRun (FAILED, SUCCESS or RUNNING), in order to make sure we don't > > > miss any tasks - because in this paradigm, the concept of SLA > triggering > > is > > > decoupled from a dagrun or task execution. > > > A query that selects tasks in ANY state in ANY state of dag_run is > bound > > to > > > be incredibly expensive - and I discuss this challenge in the > Confluence > > > AIP and the Google Doc. > > > This will possibly be even more difficult to achieve, because we should > > > have the capacity to support multiple processes since we now support > High > > > Availability in Airflow. > > > So although setting up a separate process decouples the SLA evaluation > > from > > > the scheduler, we need to acknowledge that we may be introducing a > heavy > > > dependency on the metadata database. > > > > > > My suggestion to leverage the existing Triggerer process to design > > > monitoring Deferrable Operators to execute SLA callbacks has the > benefit > > of > > > reducing the load on the database while achieving similar goals, > because > > it > > > registers the SLA monitoring operator as a TASK to the dag_run that it > is > > > associated with, and prevents the dag_run from completing if the SLA > has > > > not yet computed. This means that our query will be strictly limited to > > > just the dagRuns in RUNNING state - this is a HUGE difference from > having > > > to query dagruns in all states in a separate process, because we are > > merely > > > attaching a few additional tasks to be executed into existing dag_runs. > > > > > > In summary: I'm open to this idea, I just have not been able to think > of > > a > > > way to manage this without overloading the scheduler, or the database. > > > > > > 3. task: time_limit_sla > > > Jarek: That sounds like a great idea that we could group into this AIP > - > > I > > > will make some time to add some code snippets into the AIP to make this > > > idea a bit clearer to everyone reading it in preparation for the vote > > > > > > > > > Sung > > > > > > On Sun, Jun 18, 2023 at 9:38 PM utkarsh sharma <utkarshar...@gmail.com > > > > > wrote: > > > > > > > > > > > > > This can be IMHO implemented on the task level. We currently have > > > timeout > > > > > implemented this way - whenever we start the task, we can have a > > signal > > > > > handler registered with "real" time registered that will cancel the > > > task. > > > > > But I can imagine similar approach with signal and propagate the > > > > > information that task exceeded the time it has been allocated but > > would > > > > not > > > > > stop it, just propagate the information (in a form of current way > we > > do > > > > > callbacks for example, or maybe (even better) only run it in the > > > context > > > > of > > > > > task to signal "soft timeout" per task: > > > > > > > > > > > signal.signal(signal.SIGALRM, self.handle_timeout) > > > > > > signal.setitimer(signal.ITIMER_REAL, self.seconds) > > > > > > > > > > This has an advantage that it is fully distributed - i.e. we do not > > > need > > > > > anything to monitor 1000s of tasks running to decide if SLA has > been > > > > > breached. It's the task itself that will get the "soft" timeout and > > > > > propagate it (and then whoever receives the callback can decide > what > > to > > > > do > > > > > next - and this "callback" can happen in either the task context or > > it > > > > > could be done in a DagFileProcessor context as we do currently - > > though > > > > the > > > > > in-task processing seems much more distributed and scalable in > > nature. > > > > > There is one watch-out here that this is not **guaranteed** to > work, > > > > there > > > > > are cases, that we already saw that the SIGALRM is not going to be > > > > handled > > > > > locally, when the task uses long running C-level function that is > not > > > > > written in the way to react to signals generated in Python (thinks > > > > > low-level long-running Pandas c-method call that does not check > > signal > > > > in a > > > > > long-running-loop. That however probably could be handled by one > more > > > > > process fork and have a dedicated child process that would monitor > > > > running > > > > > tasks from a separate process - and we could actually improve both > > > > timeout > > > > > and SLA handling by introducing such extra forked process to handle > > > > > timeout/task level time_limit_sla, so IMHO this is an opportunity > to > > > > > improve things. > > > > > > > > > > > > > > > > > Building on what Jarek mentioned, If we can enable the scheduler to > > emit > > > > events for DAGs with SLA configured in cases of > > > > 1. DAG starts executing > > > > 2. Task - start executing(for every task) > > > > 3. Task - stop executing(for every task) > > > > 4. DAG stops executing > > > > > > > > And have a separate process(per dag run) that can keep monitoring > such > > > > events and execute a callback in the following circumstances: > > > > 1. DAG level SLA miss > > > > - When the entire DAG didn't finish in a specific time > > > > 2. Task-level SLA miss > > > > - Counting time from the start of the DAG to the end of a task. > > > > - Start of a task to end of a task. > > > > > > > > I think above approch should be addressing issues listed in AIP-57 > > > > < > > > > > > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-57+SLA+as+a+DAG-Level+Feature > > > > > > > > > 1. Since we have a septate process we no longer have to wait for the > > > > Tasks/DAG to be in the SUCCESS/SKIPPED state or any other terminal > > state. > > > > In this process, we can have a loop executing periodically in > intervals > > > of > > > > ex- 1sec to monitor SLA misses by monitoring events data and > > > task_instance > > > > table. > > > > 2. For Manually/Dataset triggered dags, we no longer have a > dependency > > > on a > > > > fixed schedule, everything we need to evaluate SLA miss is already > > > present > > > > in Events for that specific DAG. > > > > 3. This approach also enables us to run callbacks on the task level. > > > > 4. We can remove calls to sla_miss_callbacks every time is called > > > > *dag_run.update_state* > > > > > > > > A couple of things I'm not about are - > > > > 1. Where to execute the callbacks. Executing a callback in the same > > > process > > > > as the monitoring process can have a downside if the callback takes > > much > > > > time to execute it will probably cause other SLA callbacks to be > > delayed. > > > > 2. Context of execution of callback, we have to maintain the same > > context > > > > in which the callback is defined. > > > > > > > > Would love to know other people's thoughts on this :) > > > > > > > > Thanks, > > > > Utkarsh Sharma > > > > > > > > > > > > On Sun, Jun 18, 2023 at 4:08 PM Iaroslav Poskriakov < > > > > yaroslavposkrya...@gmail.com> wrote: > > > > > > > > > I want to say that airflow is a very popular project and the ways > of > > > > > calculating SLA are different. Because of different business cases. > > And > > > > if > > > > > it's possible we should make most of them from the box. > > > > > > > > > > вс, 18 июн. 2023 г. в 13:30, Iaroslav Poskriakov < > > > > > yaroslavposkrya...@gmail.com>: > > > > > > > > > > > So, I totally agree about dag level slas. It's very important to > > have > > > > it > > > > > > and according to Sung Yun proposal it should be implemented not > on > > > the > > > > > > scheduler job level. > > > > > > > > > > > > Regarding the second way of determining SLA: <task state STARTED> > > --> > > > > > > ..<doesn't matter what happened>.. --> <task state SUCCESS>. > > > > > > It's very helpful in the way when we want to achieve not > technical > > > SLA > > > > > but > > > > > > business SLA for the team which is using that DAG. Because > between > > > > those > > > > > > two states anything could happen and at the end we might want to > > > > > understand > > > > > > high level SLA for the task. Because it doesn't matter for > > business I > > > > > guess > > > > > > that path of states of the task was something like: STARTED -> > > > RUNNING > > > > -> > > > > > > FAILED -> RUNNING -> FAILED -> RUNNING -> SUCCESS. And in case > when > > > > > > something similar is happening it can be helpful to have an > > > opportunity > > > > > of automatically > > > > > > recognizing that the expected time for the task crossed the > > border. > > > > > > > > > > > > I agree that for the scheduler it can be too heavy. And also for > > that > > > > > > purpose we need to have some process which is running in parallel > > > with > > > > > the > > > > > > task. It can be one more job for example which is running on the > > same > > > > > > machine as Scheduler, or not on the same. > > > > > > > > > > > > > > > > > > About the third part of my proposal - time for the task in the > > > > > > RUNNING state. I agree with you, Jarek. We can implement it on > the > > > task > > > > > > level. For me it seems good. > > > > > > > > > > > > Yaro1 > > > > > > > > > > > > вс, 18 июн. 2023 г. в 08:12, Jarek Potiuk <ja...@potiuk.com>: > > > > > > > > > > > >> I am also for DAG level SLA only (but maybe there are some > > twists). > > > > > >> > > > > > >> And I hope (since Sung Yun has not given up on that) - maybe > that > > is > > > > the > > > > > >> right time that others here will chime in and maybe it will let > > the > > > > vote > > > > > >> go > > > > > >> on? I think it would be great to get the SLA feature sorted out > so > > > > that > > > > > we > > > > > >> have a chance to stop answering ("yeah, we know SLA is broken, > it > > > has > > > > > >> always been"). It would be nice to say "yeah the old deprecated > > SLA > > > is > > > > > >> broken, but we have this new mechanism(s) that replace it". The > > one > > > > > >> proposed by Sung has a good chance of being such a replacement. > > > > > >> > > > > > >> I think having a task-level SLA managed by the Airflow framework > > > might > > > > > >> indeed be too costly and does not fit well in the current > > > > architecture. > > > > > I > > > > > >> think attempting to monitor how long a given task runs by the > > > > scheduler > > > > > is > > > > > >> simply a huge overkill. Generally speaking - scheduler (as > > > surprising > > > > it > > > > > >> might be for anyone) does not monitor executing tasks (at least > > > > > >> principally > > > > > >> speaking). It merely submits the tasks to execute to executor > and > > > let > > > > > >> executor handle all kinds of monitoring of what is being > executed > > > > when, > > > > > >> and > > > > > >> then - depending on the different types of executors there are > > > various > > > > > >> conditions when and how task is being executed, and various ways > > how > > > > you > > > > > >> can define different kinds of task SLAs. Or at least this is > how I > > > > think > > > > > >> about the distributed nature of Airflow on a "logical" level. > Once > > > > task > > > > > is > > > > > >> queued for execution, the scheduler takes its hands off and > turns > > > its > > > > > >> attention to tasks that are not yet scheduled and should be or > > tasks > > > > > that > > > > > >> are scheduled but not queued yet. > > > > > >> > > > > > >> But maybe some of the SLA "task" expectations can be > implemented > > > in a > > > > > >> limited version serving very limited cases on a task level? > > > > > >> > > > > > >> Referring to what Yaro1 wrote: > > > > > >> > > > > > >> > 1. It doesn't matter for us how long we are spending time on > > some > > > > > >> specific > > > > > >> task. It's important to have an understanding of the lag between > > > > > >> execution_date of dag and success state for the task. We can > call > > it > > > > > >> dag_sla. It's similar to the current implementation of > > manage_slas. > > > > > >> > > > > > >> This is basically what Sung proposes, I believe. > > > > > >> > > > > > >> > > > > > >> > 2. It's important to have an understanding and managing how > long > > > > some > > > > > >> specific task is working. In my opinion working is the state > > between > > > > > task > > > > > >> last start_date and task first (after last start_date) SUCCESS > > > state. > > > > So > > > > > >> for example for the task which is placed in FAILED state we > still > > > have > > > > > to > > > > > >> check an SLA in that strategy. We can call it task_sla. > > > > > >> > > > > > >> I am not sure if I understand it, but If I do, then this is the > > > "super > > > > > >> costly" SLA processing that we should likely avoid. I would love > > to > > > > hear > > > > > >> however, what are some specific use cases that we could show > here, > > > > maybe > > > > > >> there are other ways we can achieve similar things. > > > > > >> > > > > > >> > > > > > >> > 3. Sometimes we need to manage time for the task in the > RUNNING > > > > state. > > > > > >> We > > > > > >> can call it time_limit_sla. > > > > > >> > > > > > >> This can be IMHO implemented on the task level. We currently > have > > > > > timeout > > > > > >> implemented this way - whenever we start the task, we can have a > > > > signal > > > > > >> handler registered with "real" time registered that will cancel > > the > > > > > task. > > > > > >> But I can imagine similar approach with signal and propagate the > > > > > >> information that task exceeded the time it has been allocated > but > > > > would > > > > > >> not > > > > > >> stop it, just propagate the information (in a form of current > way > > we > > > > do > > > > > >> callbacks for example, or maybe (even better) only run it in the > > > > context > > > > > >> of > > > > > >> task to signal "soft timeout" per task: > > > > > >> > > > > > >> > signal.signal(signal.SIGALRM, self.handle_timeout) > > > > > >> > signal.setitimer(signal.ITIMER_REAL, self.seconds) > > > > > >> > > > > > >> This has an advantage that it is fully distributed - i.e. we do > > not > > > > need > > > > > >> anything to monitor 1000s of tasks running to decide if SLA has > > been > > > > > >> breached. It's the task itself that will get the "soft" timeout > > and > > > > > >> propagate it (and then whoever receives the callback can decide > > what > > > > to > > > > > do > > > > > >> next - and this "callback" can happen in either the task context > > or > > > it > > > > > >> could be done in a DagFileProcessor context as we do currently - > > > > though > > > > > >> the > > > > > >> in-task processing seems much more distributed and scalable in > > > nature. > > > > > >> There is one watch-out here that this is not **guaranteed** to > > work, > > > > > there > > > > > >> are cases, that we already saw that the SIGALRM is not going to > be > > > > > handled > > > > > >> locally, when the task uses long running C-level function that > is > > > not > > > > > >> written in the way to react to signals generated in Python > (thinks > > > > > >> low-level long-running Pandas c-method call that does not check > > > signal > > > > > in > > > > > >> a > > > > > >> long-running-loop. That however probably could be handled by one > > > more > > > > > >> process fork and have a dedicated child process that would > monitor > > > > > running > > > > > >> tasks from a separate process - and we could actually improve > both > > > > > timeout > > > > > >> and SLA handling by introducing such extra forked process to > > handle > > > > > >> timeout/task level time_limit_sla, so IMHO this is an > opportunity > > to > > > > > >> improve things. > > > > > >> > > > > > >> I would love to hear what others think about it :)? I think our > > SLA > > > > for > > > > > >> fixing SLA is about to run out. > > > > > >> > > > > > >> > > > > > >> J. > > > > > >> > > > > > >> > > > > > >> On Thu, Jun 15, 2023 at 4:05 PM Sung Yun <sy...@cornell.edu> > > wrote: > > > > > >> > > > > > >> > Hello! > > > > > >> > > > > > > >> > Thank you very much for the feedback on the proposal. I’ve > been > > > > hoping > > > > > >> to > > > > > >> > get some more traction on this proposal, so it’s great to hear > > > from > > > > > >> another > > > > > >> > user of the feature. > > > > > >> > > > > > > >> > I understand that there’s a lot of support for keeping a > native > > > task > > > > > >> level > > > > > >> > SLA feature, and I definitely agree with that sentiment. Our > > > > > >> organization > > > > > >> > very much relies on Airflow to evaluate ‘task_sla’ in order to > > > keep > > > > > >> track > > > > > >> > of which tasks in each dags failed to succeed by an expected > > time. > > > > > >> > > > > > > >> > In the AIP I put together on the Confluence page, and in the > > > Google > > > > > >> docs, > > > > > >> > I have identified why the existing implementation of the task > > > level > > > > > SLA > > > > > >> > feature can be problematic and is often misleading for Airflow > > > > users. > > > > > >> The > > > > > >> > feature is also quite costly for Airflow scheduler and > > > > dag_processor. > > > > > >> > > > > > > >> > In that sense, the discussion is not about whether or not > these > > > SLA > > > > > >> > features are important to the users, but much more technical. > > Can > > > a > > > > > >> > task-level feature be supported in a first-class way as a core > > > > feature > > > > > >> of > > > > > >> > Airflow, or should it be implemented by the users, for example > > as > > > > > >> > independent tasks by leveraging Deferrable Operators. > > > > > >> > > > > > > >> > My current thought is that only Dag level SLAs can be > supported > > > in a > > > > > >> > non-disruptive way by the scheduler, and that task level SLAs > > > should > > > > > be > > > > > >> > handled outside of core Airflow infrastructure code. If you > > > strongly > > > > > >> > believe otherwise, I think it would be helpful if you could > > > propose > > > > an > > > > > >> > alternative technical solution that solves many of the > existing > > > > > >> problems in > > > > > >> > the task-level SLA feature. > > > > > >> > > > > > > >> > Sent from my iPhone > > > > > >> > > > > > > >> > > On Jun 13, 2023, at 1:10 PM, Ярослав Поскряков < > > > > > >> > yaroslavposkrya...@gmail.com> wrote: > > > > > >> > > > > > > > >> > > Mechanism of SLA > > > > > >> > > > > > > > >> > > Hi, I read the previous conversation regarding SLA and I > think > > > > > >> removing > > > > > >> > the > > > > > >> > > opportunity to set sla for the task level will be a big > > mistake. > > > > > >> > > So, the proposed implementation of the task level SLA will > not > > > be > > > > > >> working > > > > > >> > > correctly. > > > > > >> > > > > > > > >> > > That's why I guess we have to think about the mechanism of > > using > > > > > SLA. > > > > > >> > > > > > > > >> > > I guess we should check three different cases in general. > > > > > >> > > > > > > > >> > > > > > > > >> > > 1. It doesn't matter for us how long we are spending time on > > > some > > > > > >> > specific > > > > > >> > > task. It's important to have an understanding of the lag > > between > > > > > >> > > execution_date of dag and success state for the task. We can > > > call > > > > it > > > > > >> > > dag_sla. It's similar to the current implementation of > > > > manage_slas. > > > > > >> > > > > > > > >> > > > > > > > >> > > 2. It's important to have an understanding and managing how > > long > > > > > some > > > > > >> > > specific task is working. In my opinion working is the state > > > > between > > > > > >> task > > > > > >> > > last start_date and task first (after last start_date) > SUCCESS > > > > > state. > > > > > >> So > > > > > >> > > for example for the task which is placed in FAILED state we > > > still > > > > > >> have to > > > > > >> > > check an SLA in that strategy. We can call it task_sla. > > > > > >> > > > > > > > >> > > > > > > > >> > > 3. Sometimes we need to manage time for the task in the > > RUNNING > > > > > >> state. We > > > > > >> > > can call it time_limit_sla. > > > > > >> > > > > > > > >> > > > > > > > >> > > Those three types of SLA will cover all possible cases. > > > > > >> > > > > > > > >> > > > > > > > >> > > So we will have three different strategies for SLA. > > > > > >> > > > > > > > >> > > > > > > > >> > > I guess we can use for dag_sla that idea - > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals > > > > > >> > > > > > > > >> > > > > > > > >> > > For task_sla and time_limit_sla I prefer to stay with using > > > > > >> SchedulerJob > > > > > >> > > > > > > > >> > > > > > > > >> > > Github: Yaro1 > > > > > >> > > > > > > >> > > > > > --------------------------------------------------------------------- > > > > > >> > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org > > > > > >> > For additional commands, e-mail: dev-h...@airflow.apache.org > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Sung Yun > > > Cornell Tech '20 > > > Master of Engineering in Computer Science > > > > > > > > -- > Sung Yun > Cornell Tech '20 > Master of Engineering in Computer Science >