This task_sla is more and more making me think of a ‘task’ on its own. It
would need to be run in parallel, non blocking, not overlap between each
other, etc…

How hard would it be to spawn them when a task run with SLA configured as a
normal workload on the worker ?
Maybe on a dedicated queue / worker ?

On Tue 20 Jun 2023 at 16:47, Sung Yun <sy...@cornell.edu> wrote:

> Thank you all for your continued engagement and input! It looks like
> Iaroslav's layout of 3 different labels of SLA's is helping us group the
> implementation into different categories, so I will organize my own
> responses in those logical groupings as well.
>
> 1. dag_sla
> 2. task_sla
> 3. task: time_limit_sla
>
> 1. dag_sla
> I am going to lean in on Jarek's support in driving us to agree on the fact
> that, dag_sla seems like the only one that can stay within the scheduler
> without incurring an excessive burden on the core infrastructure.
>
> > So, I totally agree about dag level slas. It's very important to have it
> and according to Sung Yun proposal it should be implemented not on the
> scheduler job level.
>
> In response to this, I want to clarify that I am specifically highlighting
> that dag_sla is the only one that can be supported by the scheduler job.
> Dag_sla isn't a feature that exists right now, and my submission proposes
> exactly this!
>
> 2. task_sla
> I think Utkarsh's response really helped highlight another compounding
> issue with SLAs in Airflow, which is that users have such varying
> definition of SLAs, and what they want to do when that SLA is breached.
> On a high level, task_sla relies on a relationship between the dag_run, and
> a specific task within that specific dag_run: it is the time between a
> dag_run's scheduled start time, and the actual start or end time of an
> individual task within that run.
> Hence, it is impossible for it to be computed in a distributed way that
> address all of the issues highlighted in the AIP, and needs to be managed
> by a central process that has access to the single source of truth.
> As Utkarsh suggests, I think this is perhaps doable as a separate process,
> and probably would be much safer to do it within a separate process.
> My only concern is that we would be introducing a separate Airflow process,
> that is strictly optional, but one that requires quite a large amount of
> investment in designing the right abstractions to meet user satisfaction
> and reliability guarantees.
> It will also require us to review the database's dag/dag_run/task tables'
> indexing model to make sure that continuous queries to the database will
> not overload it.
> This isn't simple, because we will have to select tasks in any state
> (FAILED, SUCCESS or RUNNING) that has not yet had their SLA evaluated, from
> any dagRun (FAILED, SUCCESS or RUNNING), in order to make sure we don't
> miss any tasks - because in this paradigm, the concept of SLA triggering is
> decoupled from a dagrun or task execution.
> A query that selects tasks in ANY state in ANY state of dag_run is bound to
> be incredibly expensive - and I discuss this challenge in the Confluence
> AIP and the Google Doc.
> This will possibly be even more difficult to achieve, because we should
> have the capacity to support multiple processes since we now support High
> Availability in Airflow.
> So although setting up a separate process decouples the SLA evaluation from
> the scheduler, we need to acknowledge that we may be introducing a heavy
> dependency on the metadata database.
>
> My suggestion to leverage the existing Triggerer process to design
> monitoring Deferrable Operators to execute SLA callbacks has the benefit of
> reducing the load on the database while achieving similar goals, because it
> registers the SLA monitoring operator as a TASK to the dag_run that it is
> associated with, and prevents the dag_run from completing if the SLA has
> not yet computed. This means that our query will be strictly limited to
> just the dagRuns in RUNNING state - this is a HUGE difference from having
> to query dagruns in all states in a separate process, because we are merely
> attaching a few additional tasks to be executed into existing dag_runs.
>
> In summary: I'm open to this idea, I just have not been able to think of a
> way to manage this without overloading the scheduler, or the database.
>
> 3. task: time_limit_sla
> Jarek: That sounds like a great idea that we could group into this AIP - I
> will make some time to add some code snippets into the AIP to make this
> idea a bit clearer to everyone reading it in preparation for the vote
>
>
> Sung
>
> On Sun, Jun 18, 2023 at 9:38 PM utkarsh sharma <utkarshar...@gmail.com>
> wrote:
>
> > >
> > > This can be IMHO implemented on the task level. We currently have
> timeout
> > > implemented this way - whenever we start the task, we can have a signal
> > > handler registered with "real" time registered that will cancel the
> task.
> > > But I can imagine similar approach with signal and propagate the
> > > information that task exceeded the time it has been allocated but would
> > not
> > > stop it, just propagate the information (in a form of current way we do
> > > callbacks for example, or maybe (even better) only run it in the
> context
> > of
> > > task  to signal "soft timeout" per task:
> > >
> > > >             signal.signal(signal.SIGALRM, self.handle_timeout)
> > > >            signal.setitimer(signal.ITIMER_REAL, self.seconds)
> > >
> > > This has an advantage that it is fully distributed - i.e. we do not
> need
> > > anything to monitor 1000s of tasks running to decide if SLA has been
> > > breached. It's the task itself that will get the "soft" timeout and
> > > propagate it (and then whoever receives the callback can decide what to
> > do
> > > next - and this "callback" can happen in either the task context or it
> > > could be done in a DagFileProcessor context as we do currently - though
> > the
> > > in-task processing seems much more distributed and scalable in nature.
> > > There is one watch-out here that this is not **guaranteed** to work,
> > there
> > > are cases, that we already saw that the SIGALRM is not going to be
> > handled
> > > locally, when the task uses long running C-level function that is not
> > > written in the way to react to signals generated in Python (thinks
> > > low-level long-running Pandas c-method call that does not check signal
> > in a
> > > long-running-loop. That however probably could be handled by one more
> > > process fork and have a dedicated child process that would monitor
> > running
> > > tasks from a separate process - and we could actually improve both
> > timeout
> > > and SLA handling by introducing such extra forked process to handle
> > > timeout/task level time_limit_sla, so IMHO this is an opportunity to
> > > improve things.
> > >
> >
> >
> > Building on what Jarek mentioned, If we can enable the scheduler to emit
> > events for DAGs with SLA configured in cases of
> > 1. DAG starts executing
> > 2. Task - start executing(for every task)
> > 3. Task - stop executing(for every task)
> > 4. DAG stops executing
> >
> > And have a separate process(per dag run) that can keep monitoring such
> > events and execute a callback in the following circumstances:
> > 1. DAG level SLA miss
> >     - When the entire DAG didn't finish in a specific time
> > 2. Task-level SLA miss
> >     - Counting time from the start of the DAG to the end of a task.
> >     - Start of a task to end of a task.
> >
> > I think above approch should be addressing issues listed in AIP-57
> > <
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-57+SLA+as+a+DAG-Level+Feature
> > >
> > 1. Since we have a septate process we no longer have to wait for the
> > Tasks/DAG to be in the SUCCESS/SKIPPED state or any other terminal state.
> > In this process, we can have a loop executing periodically in intervals
> of
> > ex- 1sec to monitor SLA misses by monitoring events data and
> task_instance
> > table.
> > 2. For Manually/Dataset triggered dags, we no longer have a dependency
> on a
> > fixed schedule, everything we need to evaluate SLA miss is already
> present
> > in Events for that specific DAG.
> > 3. This approach also enables us to run callbacks on the task level.
> > 4. We can remove calls to sla_miss_callbacks every time is called
> > *dag_run.update_state*
> >
> > A couple of things I'm not about are -
> > 1. Where to execute the callbacks. Executing a callback in the same
> process
> > as the monitoring process can have a downside if the callback takes much
> > time to execute it will probably cause other SLA callbacks to be delayed.
> > 2. Context of execution of callback, we have to maintain the same context
> > in which the callback is defined.
> >
> > Would love to know other people's thoughts on this :)
> >
> > Thanks,
> > Utkarsh Sharma
> >
> >
> > On Sun, Jun 18, 2023 at 4:08 PM Iaroslav Poskriakov <
> > yaroslavposkrya...@gmail.com> wrote:
> >
> > > I want to say that airflow is a very popular project and the ways of
> > > calculating SLA are different. Because of different business cases. And
> > if
> > > it's possible we should make most of them from the box.
> > >
> > > вс, 18 июн. 2023 г. в 13:30, Iaroslav Poskriakov <
> > > yaroslavposkrya...@gmail.com>:
> > >
> > > > So, I totally agree about dag level slas. It's very important to have
> > it
> > > > and according to Sung Yun proposal it should be implemented not on
> the
> > > > scheduler job level.
> > > >
> > > > Regarding the second way of determining SLA: <task state STARTED> -->
> > > > ..<doesn't matter what happened>..  --> <task state SUCCESS>.
> > > > It's very helpful in the way when we want to achieve not technical
> SLA
> > > but
> > > > business SLA for the team which is using that DAG. Because between
> > those
> > > > two states anything could happen and at the end we might want to
> > > understand
> > > > high level SLA for the task. Because it doesn't matter for business I
> > > guess
> > > > that path of states of the task was something like: STARTED ->
> RUNNING
> > ->
> > > > FAILED -> RUNNING -> FAILED -> RUNNING -> SUCCESS. And in case when
> > > > something similar is happening it can be helpful to have an
> opportunity
> > > of automatically
> > > > recognizing  that the expected time for the task crossed the border.
> > > >
> > > > I agree that for the scheduler it can be too heavy. And also for that
> > > > purpose we need to have some process which is running in parallel
> with
> > > the
> > > > task. It can be one more job for example which is running on the same
> > > > machine as Scheduler, or not on the same.
> > > >
> > > >
> > > > About the third part of my proposal - time for the task in the
> > > > RUNNING state. I agree with you, Jarek. We can implement it on the
> task
> > > > level. For me it seems good.
> > > >
> > > > Yaro1
> > > >
> > > > вс, 18 июн. 2023 г. в 08:12, Jarek Potiuk <ja...@potiuk.com>:
> > > >
> > > >> I am also for DAG level SLA only (but maybe there are some twists).
> > > >>
> > > >> And I hope (since Sung Yun has not given up on that) - maybe that is
> > the
> > > >> right time that others here will chime in and maybe it will let the
> > vote
> > > >> go
> > > >> on? I think it would be great to get the SLA feature sorted out so
> > that
> > > we
> > > >> have a chance to stop answering ("yeah, we know SLA is broken, it
> has
> > > >> always been"). It would be nice to say "yeah the old deprecated SLA
> is
> > > >> broken, but we have this new mechanism(s) that replace it". The one
> > > >> proposed by Sung has a good chance of being such a replacement.
> > > >>
> > > >> I think having a task-level SLA managed by the Airflow framework
> might
> > > >> indeed be too costly and does not fit well in the current
> > architecture.
> > > I
> > > >> think attempting to monitor how long a given task runs by the
> > scheduler
> > > is
> > > >> simply a huge overkill. Generally speaking - scheduler (as
> surprising
> > it
> > > >> might be for anyone) does not monitor executing tasks (at least
> > > >> principally
> > > >> speaking). It merely submits the tasks to execute to executor and
> let
> > > >> executor handle all kinds of monitoring of what is being executed
> > when,
> > > >> and
> > > >> then - depending on the different types of executors there are
> various
> > > >> conditions when and how task is being executed, and various ways how
> > you
> > > >> can define different kinds of task SLAs. Or at least this is how I
> > think
> > > >> about the distributed nature of Airflow on a "logical" level. Once
> > task
> > > is
> > > >> queued for execution, the scheduler takes its hands off and turns
> its
> > > >> attention to tasks that are not yet scheduled and should be or tasks
> > > that
> > > >> are scheduled but not queued yet.
> > > >>
> > > >> But maybe some of the SLA "task" expectations can be  implemented
> in a
> > > >> limited version serving very limited cases on a task level?
> > > >>
> > > >> Referring to what Yaro1 wrote:
> > > >>
> > > >> > 1. It doesn't matter for us how long we are spending time on some
> > > >> specific
> > > >> task. It's important to have an understanding of the lag between
> > > >> execution_date of dag and success state for the task. We can call it
> > > >> dag_sla. It's similar to the current implementation of manage_slas.
> > > >>
> > > >> This is basically what Sung proposes, I believe.
> > > >>
> > > >>
> > > >> > 2. It's important to have an understanding and managing how long
> > some
> > > >> specific task is working. In my opinion working is the state between
> > > task
> > > >> last start_date and task first (after last start_date) SUCCESS
> state.
> > So
> > > >> for example for the task which is placed in FAILED state we still
> have
> > > to
> > > >> check an SLA in that strategy. We can call it task_sla.
> > > >>
> > > >> I am not sure if I understand it, but If I do, then this is the
> "super
> > > >> costly" SLA processing that we should likely avoid. I would love to
> > hear
> > > >> however, what are some specific use cases that we could show here,
> > maybe
> > > >> there are other ways we can achieve similar things.
> > > >>
> > > >>
> > > >> > 3. Sometimes we need to manage time for the task in the RUNNING
> > state.
> > > >> We
> > > >> can call it time_limit_sla.
> > > >>
> > > >> This can be IMHO implemented on the task level. We currently have
> > > timeout
> > > >> implemented this way - whenever we start the task, we can have a
> > signal
> > > >> handler registered with "real" time registered that will cancel the
> > > task.
> > > >> But I can imagine similar approach with signal and propagate the
> > > >> information that task exceeded the time it has been allocated but
> > would
> > > >> not
> > > >> stop it, just propagate the information (in a form of current way we
> > do
> > > >> callbacks for example, or maybe (even better) only run it in the
> > context
> > > >> of
> > > >> task  to signal "soft timeout" per task:
> > > >>
> > > >> >             signal.signal(signal.SIGALRM, self.handle_timeout)
> > > >> >            signal.setitimer(signal.ITIMER_REAL, self.seconds)
> > > >>
> > > >> This has an advantage that it is fully distributed - i.e. we do not
> > need
> > > >> anything to monitor 1000s of tasks running to decide if SLA has been
> > > >> breached. It's the task itself that will get the "soft" timeout and
> > > >> propagate it (and then whoever receives the callback can decide what
> > to
> > > do
> > > >> next - and this "callback" can happen in either the task context or
> it
> > > >> could be done in a DagFileProcessor context as we do currently -
> > though
> > > >> the
> > > >> in-task processing seems much more distributed and scalable in
> nature.
> > > >> There is one watch-out here that this is not **guaranteed** to work,
> > > there
> > > >> are cases, that we already saw that the SIGALRM is not going to be
> > > handled
> > > >> locally, when the task uses long running C-level function that is
> not
> > > >> written in the way to react to signals generated in Python (thinks
> > > >> low-level long-running Pandas c-method call that does not check
> signal
> > > in
> > > >> a
> > > >> long-running-loop. That however probably could be handled by one
> more
> > > >> process fork and have a dedicated child process that would monitor
> > > running
> > > >> tasks from a separate process - and we could actually improve both
> > > timeout
> > > >> and SLA handling by introducing such extra forked process to handle
> > > >> timeout/task level time_limit_sla, so IMHO this is an opportunity to
> > > >> improve things.
> > > >>
> > > >> I would love to hear what others think about it :)? I think our SLA
> > for
> > > >> fixing SLA is about to run out.
> > > >>
> > > >>
> > > >> J.
> > > >>
> > > >>
> > > >> On Thu, Jun 15, 2023 at 4:05 PM Sung Yun <sy...@cornell.edu> wrote:
> > > >>
> > > >> > Hello!
> > > >> >
> > > >> > Thank you very much for the feedback on the proposal. I’ve been
> > hoping
> > > >> to
> > > >> > get some more traction on this proposal, so it’s great to hear
> from
> > > >> another
> > > >> > user of the feature.
> > > >> >
> > > >> > I understand that there’s a lot of support for keeping a native
> task
> > > >> level
> > > >> > SLA feature, and I definitely agree with that sentiment. Our
> > > >> organization
> > > >> > very much relies on Airflow to evaluate ‘task_sla’ in order to
> keep
> > > >> track
> > > >> > of which tasks in each dags failed to succeed by an expected time.
> > > >> >
> > > >> > In the AIP I put together on the Confluence page, and in the
> Google
> > > >> docs,
> > > >> > I have identified why the existing implementation of the task
> level
> > > SLA
> > > >> > feature can be problematic and is often misleading for Airflow
> > users.
> > > >> The
> > > >> > feature is also quite costly for Airflow scheduler and
> > dag_processor.
> > > >> >
> > > >> > In that sense, the discussion is not about whether or not these
> SLA
> > > >> > features are important to the users, but much more technical. Can
> a
> > > >> > task-level feature be supported in a first-class way as a core
> > feature
> > > >> of
> > > >> > Airflow, or should it be implemented by the users, for example as
> > > >> > independent tasks by leveraging Deferrable Operators.
> > > >> >
> > > >> > My current thought is that only Dag level SLAs can be supported
> in a
> > > >> > non-disruptive way by the scheduler, and that task level SLAs
> should
> > > be
> > > >> > handled outside of core Airflow infrastructure code. If you
> strongly
> > > >> > believe otherwise, I think it would be helpful if you could
> propose
> > an
> > > >> > alternative technical solution that solves many of the existing
> > > >> problems in
> > > >> > the task-level SLA feature.
> > > >> >
> > > >> > Sent from my iPhone
> > > >> >
> > > >> > > On Jun 13, 2023, at 1:10 PM, Ярослав Поскряков <
> > > >> > yaroslavposkrya...@gmail.com> wrote:
> > > >> > >
> > > >> > > Mechanism of SLA
> > > >> > >
> > > >> > > Hi, I read the previous conversation regarding SLA and I think
> > > >> removing
> > > >> > the
> > > >> > > opportunity to set sla for the task level will be a big mistake.
> > > >> > > So, the proposed implementation of the task level SLA will not
> be
> > > >> working
> > > >> > > correctly.
> > > >> > >
> > > >> > > That's why I guess we have to think about the mechanism of using
> > > SLA.
> > > >> > >
> > > >> > > I guess we should check three different cases in general.
> > > >> > >
> > > >> > >
> > > >> > > 1. It doesn't matter for us how long we are spending time on
> some
> > > >> > specific
> > > >> > > task. It's important to have an understanding of the lag between
> > > >> > > execution_date of dag and success state for the task. We can
> call
> > it
> > > >> > > dag_sla. It's similar to the current implementation of
> > manage_slas.
> > > >> > >
> > > >> > >
> > > >> > > 2. It's important to have an understanding and managing how long
> > > some
> > > >> > > specific task is working. In my opinion working is the state
> > between
> > > >> task
> > > >> > > last start_date and task first (after last start_date) SUCCESS
> > > state.
> > > >> So
> > > >> > > for example for the task which is placed in FAILED state we
> still
> > > >> have to
> > > >> > > check an SLA in that strategy. We can call it task_sla.
> > > >> > >
> > > >> > >
> > > >> > > 3. Sometimes we need to manage time for the task in the RUNNING
> > > >> state. We
> > > >> > > can call it time_limit_sla.
> > > >> > >
> > > >> > >
> > > >> > > Those three types of SLA will cover all possible cases.
> > > >> > >
> > > >> > >
> > > >> > > So we will have three different strategies for SLA.
> > > >> > >
> > > >> > >
> > > >> > > I guess we can use for dag_sla that idea -
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals
> > > >> > >
> > > >> > >
> > > >> > > For task_sla and time_limit_sla I prefer to stay with using
> > > >> SchedulerJob
> > > >> > >
> > > >> > >
> > > >> > > Github: Yaro1
> > > >> >
> > > >> >
> > ---------------------------------------------------------------------
> > > >> > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
> > > >> > For additional commands, e-mail: dev-h...@airflow.apache.org
> > > >> >
> > > >> >
> > > >>
> > > >
> > >
> >
>
>
> --
> Sung Yun
> Cornell Tech '20
> Master of Engineering in Computer Science
>

Reply via email to