>
> This can be IMHO implemented on the task level. We currently have timeout
> implemented this way - whenever we start the task, we can have a signal
> handler registered with "real" time registered that will cancel the task.
> But I can imagine similar approach with signal and propagate the
> information that task exceeded the time it has been allocated but would not
> stop it, just propagate the information (in a form of current way we do
> callbacks for example, or maybe (even better) only run it in the context of
> task  to signal "soft timeout" per task:
>
> >             signal.signal(signal.SIGALRM, self.handle_timeout)
> >            signal.setitimer(signal.ITIMER_REAL, self.seconds)
>
> This has an advantage that it is fully distributed - i.e. we do not need
> anything to monitor 1000s of tasks running to decide if SLA has been
> breached. It's the task itself that will get the "soft" timeout and
> propagate it (and then whoever receives the callback can decide what to do
> next - and this "callback" can happen in either the task context or it
> could be done in a DagFileProcessor context as we do currently - though the
> in-task processing seems much more distributed and scalable in nature.
> There is one watch-out here that this is not **guaranteed** to work, there
> are cases, that we already saw that the SIGALRM is not going to be handled
> locally, when the task uses long running C-level function that is not
> written in the way to react to signals generated in Python (thinks
> low-level long-running Pandas c-method call that does not check signal in a
> long-running-loop. That however probably could be handled by one more
> process fork and have a dedicated child process that would monitor running
> tasks from a separate process - and we could actually improve both timeout
> and SLA handling by introducing such extra forked process to handle
> timeout/task level time_limit_sla, so IMHO this is an opportunity to
> improve things.
>


Building on what Jarek mentioned, If we can enable the scheduler to emit
events for DAGs with SLA configured in cases of
1. DAG starts executing
2. Task - start executing(for every task)
3. Task - stop executing(for every task)
4. DAG stops executing

And have a separate process(per dag run) that can keep monitoring such
events and execute a callback in the following circumstances:
1. DAG level SLA miss
    - When the entire DAG didn't finish in a specific time
2. Task-level SLA miss
    - Counting time from the start of the DAG to the end of a task.
    - Start of a task to end of a task.

I think above approch should be addressing issues listed in AIP-57
<https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-57+SLA+as+a+DAG-Level+Feature>
1. Since we have a septate process we no longer have to wait for the
Tasks/DAG to be in the SUCCESS/SKIPPED state or any other terminal state.
In this process, we can have a loop executing periodically in intervals of
ex- 1sec to monitor SLA misses by monitoring events data and task_instance
table.
2. For Manually/Dataset triggered dags, we no longer have a dependency on a
fixed schedule, everything we need to evaluate SLA miss is already present
in Events for that specific DAG.
3. This approach also enables us to run callbacks on the task level.
4. We can remove calls to sla_miss_callbacks every time is called
*dag_run.update_state*

A couple of things I'm not about are -
1. Where to execute the callbacks. Executing a callback in the same process
as the monitoring process can have a downside if the callback takes much
time to execute it will probably cause other SLA callbacks to be delayed.
2. Context of execution of callback, we have to maintain the same context
in which the callback is defined.

Would love to know other people's thoughts on this :)

Thanks,
Utkarsh Sharma


On Sun, Jun 18, 2023 at 4:08 PM Iaroslav Poskriakov <
yaroslavposkrya...@gmail.com> wrote:

> I want to say that airflow is a very popular project and the ways of
> calculating SLA are different. Because of different business cases. And if
> it's possible we should make most of them from the box.
>
> вс, 18 июн. 2023 г. в 13:30, Iaroslav Poskriakov <
> yaroslavposkrya...@gmail.com>:
>
> > So, I totally agree about dag level slas. It's very important to have it
> > and according to Sung Yun proposal it should be implemented not on the
> > scheduler job level.
> >
> > Regarding the second way of determining SLA: <task state STARTED> -->
> > ..<doesn't matter what happened>..  --> <task state SUCCESS>.
> > It's very helpful in the way when we want to achieve not technical SLA
> but
> > business SLA for the team which is using that DAG. Because between those
> > two states anything could happen and at the end we might want to
> understand
> > high level SLA for the task. Because it doesn't matter for business I
> guess
> > that path of states of the task was something like: STARTED -> RUNNING ->
> > FAILED -> RUNNING -> FAILED -> RUNNING -> SUCCESS. And in case when
> > something similar is happening it can be helpful to have an opportunity
> of automatically
> > recognizing  that the expected time for the task crossed the border.
> >
> > I agree that for the scheduler it can be too heavy. And also for that
> > purpose we need to have some process which is running in parallel with
> the
> > task. It can be one more job for example which is running on the same
> > machine as Scheduler, or not on the same.
> >
> >
> > About the third part of my proposal - time for the task in the
> > RUNNING state. I agree with you, Jarek. We can implement it on the task
> > level. For me it seems good.
> >
> > Yaro1
> >
> > вс, 18 июн. 2023 г. в 08:12, Jarek Potiuk <ja...@potiuk.com>:
> >
> >> I am also for DAG level SLA only (but maybe there are some twists).
> >>
> >> And I hope (since Sung Yun has not given up on that) - maybe that is the
> >> right time that others here will chime in and maybe it will let the vote
> >> go
> >> on? I think it would be great to get the SLA feature sorted out so that
> we
> >> have a chance to stop answering ("yeah, we know SLA is broken, it has
> >> always been"). It would be nice to say "yeah the old deprecated SLA is
> >> broken, but we have this new mechanism(s) that replace it". The one
> >> proposed by Sung has a good chance of being such a replacement.
> >>
> >> I think having a task-level SLA managed by the Airflow framework might
> >> indeed be too costly and does not fit well in the current architecture.
> I
> >> think attempting to monitor how long a given task runs by the scheduler
> is
> >> simply a huge overkill. Generally speaking - scheduler (as surprising it
> >> might be for anyone) does not monitor executing tasks (at least
> >> principally
> >> speaking). It merely submits the tasks to execute to executor and let
> >> executor handle all kinds of monitoring of what is being executed when,
> >> and
> >> then - depending on the different types of executors there are various
> >> conditions when and how task is being executed, and various ways how you
> >> can define different kinds of task SLAs. Or at least this is how I think
> >> about the distributed nature of Airflow on a "logical" level. Once task
> is
> >> queued for execution, the scheduler takes its hands off and turns its
> >> attention to tasks that are not yet scheduled and should be or tasks
> that
> >> are scheduled but not queued yet.
> >>
> >> But maybe some of the SLA "task" expectations can be  implemented in a
> >> limited version serving very limited cases on a task level?
> >>
> >> Referring to what Yaro1 wrote:
> >>
> >> > 1. It doesn't matter for us how long we are spending time on some
> >> specific
> >> task. It's important to have an understanding of the lag between
> >> execution_date of dag and success state for the task. We can call it
> >> dag_sla. It's similar to the current implementation of manage_slas.
> >>
> >> This is basically what Sung proposes, I believe.
> >>
> >>
> >> > 2. It's important to have an understanding and managing how long some
> >> specific task is working. In my opinion working is the state between
> task
> >> last start_date and task first (after last start_date) SUCCESS state. So
> >> for example for the task which is placed in FAILED state we still have
> to
> >> check an SLA in that strategy. We can call it task_sla.
> >>
> >> I am not sure if I understand it, but If I do, then this is the "super
> >> costly" SLA processing that we should likely avoid. I would love to hear
> >> however, what are some specific use cases that we could show here, maybe
> >> there are other ways we can achieve similar things.
> >>
> >>
> >> > 3. Sometimes we need to manage time for the task in the RUNNING state.
> >> We
> >> can call it time_limit_sla.
> >>
> >> This can be IMHO implemented on the task level. We currently have
> timeout
> >> implemented this way - whenever we start the task, we can have a signal
> >> handler registered with "real" time registered that will cancel the
> task.
> >> But I can imagine similar approach with signal and propagate the
> >> information that task exceeded the time it has been allocated but would
> >> not
> >> stop it, just propagate the information (in a form of current way we do
> >> callbacks for example, or maybe (even better) only run it in the context
> >> of
> >> task  to signal "soft timeout" per task:
> >>
> >> >             signal.signal(signal.SIGALRM, self.handle_timeout)
> >> >            signal.setitimer(signal.ITIMER_REAL, self.seconds)
> >>
> >> This has an advantage that it is fully distributed - i.e. we do not need
> >> anything to monitor 1000s of tasks running to decide if SLA has been
> >> breached. It's the task itself that will get the "soft" timeout and
> >> propagate it (and then whoever receives the callback can decide what to
> do
> >> next - and this "callback" can happen in either the task context or it
> >> could be done in a DagFileProcessor context as we do currently - though
> >> the
> >> in-task processing seems much more distributed and scalable in nature.
> >> There is one watch-out here that this is not **guaranteed** to work,
> there
> >> are cases, that we already saw that the SIGALRM is not going to be
> handled
> >> locally, when the task uses long running C-level function that is not
> >> written in the way to react to signals generated in Python (thinks
> >> low-level long-running Pandas c-method call that does not check signal
> in
> >> a
> >> long-running-loop. That however probably could be handled by one more
> >> process fork and have a dedicated child process that would monitor
> running
> >> tasks from a separate process - and we could actually improve both
> timeout
> >> and SLA handling by introducing such extra forked process to handle
> >> timeout/task level time_limit_sla, so IMHO this is an opportunity to
> >> improve things.
> >>
> >> I would love to hear what others think about it :)? I think our SLA for
> >> fixing SLA is about to run out.
> >>
> >>
> >> J.
> >>
> >>
> >> On Thu, Jun 15, 2023 at 4:05 PM Sung Yun <sy...@cornell.edu> wrote:
> >>
> >> > Hello!
> >> >
> >> > Thank you very much for the feedback on the proposal. I’ve been hoping
> >> to
> >> > get some more traction on this proposal, so it’s great to hear from
> >> another
> >> > user of the feature.
> >> >
> >> > I understand that there’s a lot of support for keeping a native task
> >> level
> >> > SLA feature, and I definitely agree with that sentiment. Our
> >> organization
> >> > very much relies on Airflow to evaluate ‘task_sla’ in order to keep
> >> track
> >> > of which tasks in each dags failed to succeed by an expected time.
> >> >
> >> > In the AIP I put together on the Confluence page, and in the Google
> >> docs,
> >> > I have identified why the existing implementation of the task level
> SLA
> >> > feature can be problematic and is often misleading for Airflow users.
> >> The
> >> > feature is also quite costly for Airflow scheduler and dag_processor.
> >> >
> >> > In that sense, the discussion is not about whether or not these SLA
> >> > features are important to the users, but much more technical. Can a
> >> > task-level feature be supported in a first-class way as a core feature
> >> of
> >> > Airflow, or should it be implemented by the users, for example as
> >> > independent tasks by leveraging Deferrable Operators.
> >> >
> >> > My current thought is that only Dag level SLAs can be supported in a
> >> > non-disruptive way by the scheduler, and that task level SLAs should
> be
> >> > handled outside of core Airflow infrastructure code. If you strongly
> >> > believe otherwise, I think it would be helpful if you could propose an
> >> > alternative technical solution that solves many of the existing
> >> problems in
> >> > the task-level SLA feature.
> >> >
> >> > Sent from my iPhone
> >> >
> >> > > On Jun 13, 2023, at 1:10 PM, Ярослав Поскряков <
> >> > yaroslavposkrya...@gmail.com> wrote:
> >> > >
> >> > > Mechanism of SLA
> >> > >
> >> > > Hi, I read the previous conversation regarding SLA and I think
> >> removing
> >> > the
> >> > > opportunity to set sla for the task level will be a big mistake.
> >> > > So, the proposed implementation of the task level SLA will not be
> >> working
> >> > > correctly.
> >> > >
> >> > > That's why I guess we have to think about the mechanism of using
> SLA.
> >> > >
> >> > > I guess we should check three different cases in general.
> >> > >
> >> > >
> >> > > 1. It doesn't matter for us how long we are spending time on some
> >> > specific
> >> > > task. It's important to have an understanding of the lag between
> >> > > execution_date of dag and success state for the task. We can call it
> >> > > dag_sla. It's similar to the current implementation of manage_slas.
> >> > >
> >> > >
> >> > > 2. It's important to have an understanding and managing how long
> some
> >> > > specific task is working. In my opinion working is the state between
> >> task
> >> > > last start_date and task first (after last start_date) SUCCESS
> state.
> >> So
> >> > > for example for the task which is placed in FAILED state we still
> >> have to
> >> > > check an SLA in that strategy. We can call it task_sla.
> >> > >
> >> > >
> >> > > 3. Sometimes we need to manage time for the task in the RUNNING
> >> state. We
> >> > > can call it time_limit_sla.
> >> > >
> >> > >
> >> > > Those three types of SLA will cover all possible cases.
> >> > >
> >> > >
> >> > > So we will have three different strategies for SLA.
> >> > >
> >> > >
> >> > > I guess we can use for dag_sla that idea -
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals
> >> > >
> >> > >
> >> > > For task_sla and time_limit_sla I prefer to stay with using
> >> SchedulerJob
> >> > >
> >> > >
> >> > > Github: Yaro1
> >> >
> >> > ---------------------------------------------------------------------
> >> > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
> >> > For additional commands, e-mail: dev-h...@airflow.apache.org
> >> >
> >> >
> >>
> >
>

Reply via email to