Hi Jarek, I've been mulling over the implementation of (3) task:
time_limit_sla, and I have some follow up questions about the
implementation.

Which forking strategy are we exactly proposing? Currently, we invoke
task.execute_callable within the taskinstance, which we can effectively
think of as the parent process for the sake of this discussion.

Are we proposing:
Structure 1
parent: task.execute_callable
└ child 1: sla timer
└ child 2: execution_timeout timer

Or:
Structure 2
parent: looping process that parses signals from child Processes
└ child 1: sla timer
└ child 2: execution_timeout timer
└ child 3: task.execute_callable

And also, are we proposing that the callbacks be executed in the child
processes (when the timers complete) or in the parent process?

Pierre: great questions...

> How hard would it be to spawn them when a task run with SLA configured as
a
normal workload on the worker ?
> Maybe on a dedicated queue / worker ?

My current thought is that having a well-abstracted subclass implementation
of Deferrable Operator may make the most sense for now. I worry that having
a configuration-driven way of creating sla monitoring tasks, where they are
created behind the scenes, would create confusion in the user base.
Especially so, if there is no dedicated worker pool that will completely
isolate the monitoring tasks from the resource pool of normal tasks. So I'm
curious to hear what options we would have in setting up a dedicated worker
pool to compliment this idea.

Sung

On Tue, Jun 20, 2023 at 2:08 PM Pierre Jeambrun <[email protected]>
wrote:

> This task_sla is more and more making me think of a ‘task’ on its own. It
> would need to be run in parallel, non blocking, not overlap between each
> other, etc…
>
> How hard would it be to spawn them when a task run with SLA configured as a
> normal workload on the worker ?
> Maybe on a dedicated queue / worker ?
>
> On Tue 20 Jun 2023 at 16:47, Sung Yun <[email protected]> wrote:
>
> > Thank you all for your continued engagement and input! It looks like
> > Iaroslav's layout of 3 different labels of SLA's is helping us group the
> > implementation into different categories, so I will organize my own
> > responses in those logical groupings as well.
> >
> > 1. dag_sla
> > 2. task_sla
> > 3. task: time_limit_sla
> >
> > 1. dag_sla
> > I am going to lean in on Jarek's support in driving us to agree on the
> fact
> > that, dag_sla seems like the only one that can stay within the scheduler
> > without incurring an excessive burden on the core infrastructure.
> >
> > > So, I totally agree about dag level slas. It's very important to have
> it
> > and according to Sung Yun proposal it should be implemented not on the
> > scheduler job level.
> >
> > In response to this, I want to clarify that I am specifically
> highlighting
> > that dag_sla is the only one that can be supported by the scheduler job.
> > Dag_sla isn't a feature that exists right now, and my submission proposes
> > exactly this!
> >
> > 2. task_sla
> > I think Utkarsh's response really helped highlight another compounding
> > issue with SLAs in Airflow, which is that users have such varying
> > definition of SLAs, and what they want to do when that SLA is breached.
> > On a high level, task_sla relies on a relationship between the dag_run,
> and
> > a specific task within that specific dag_run: it is the time between a
> > dag_run's scheduled start time, and the actual start or end time of an
> > individual task within that run.
> > Hence, it is impossible for it to be computed in a distributed way that
> > address all of the issues highlighted in the AIP, and needs to be managed
> > by a central process that has access to the single source of truth.
> > As Utkarsh suggests, I think this is perhaps doable as a separate
> process,
> > and probably would be much safer to do it within a separate process.
> > My only concern is that we would be introducing a separate Airflow
> process,
> > that is strictly optional, but one that requires quite a large amount of
> > investment in designing the right abstractions to meet user satisfaction
> > and reliability guarantees.
> > It will also require us to review the database's dag/dag_run/task tables'
> > indexing model to make sure that continuous queries to the database will
> > not overload it.
> > This isn't simple, because we will have to select tasks in any state
> > (FAILED, SUCCESS or RUNNING) that has not yet had their SLA evaluated,
> from
> > any dagRun (FAILED, SUCCESS or RUNNING), in order to make sure we don't
> > miss any tasks - because in this paradigm, the concept of SLA triggering
> is
> > decoupled from a dagrun or task execution.
> > A query that selects tasks in ANY state in ANY state of dag_run is bound
> to
> > be incredibly expensive - and I discuss this challenge in the Confluence
> > AIP and the Google Doc.
> > This will possibly be even more difficult to achieve, because we should
> > have the capacity to support multiple processes since we now support High
> > Availability in Airflow.
> > So although setting up a separate process decouples the SLA evaluation
> from
> > the scheduler, we need to acknowledge that we may be introducing a heavy
> > dependency on the metadata database.
> >
> > My suggestion to leverage the existing Triggerer process to design
> > monitoring Deferrable Operators to execute SLA callbacks has the benefit
> of
> > reducing the load on the database while achieving similar goals, because
> it
> > registers the SLA monitoring operator as a TASK to the dag_run that it is
> > associated with, and prevents the dag_run from completing if the SLA has
> > not yet computed. This means that our query will be strictly limited to
> > just the dagRuns in RUNNING state - this is a HUGE difference from having
> > to query dagruns in all states in a separate process, because we are
> merely
> > attaching a few additional tasks to be executed into existing dag_runs.
> >
> > In summary: I'm open to this idea, I just have not been able to think of
> a
> > way to manage this without overloading the scheduler, or the database.
> >
> > 3. task: time_limit_sla
> > Jarek: That sounds like a great idea that we could group into this AIP -
> I
> > will make some time to add some code snippets into the AIP to make this
> > idea a bit clearer to everyone reading it in preparation for the vote
> >
> >
> > Sung
> >
> > On Sun, Jun 18, 2023 at 9:38 PM utkarsh sharma <[email protected]>
> > wrote:
> >
> > > >
> > > > This can be IMHO implemented on the task level. We currently have
> > timeout
> > > > implemented this way - whenever we start the task, we can have a
> signal
> > > > handler registered with "real" time registered that will cancel the
> > task.
> > > > But I can imagine similar approach with signal and propagate the
> > > > information that task exceeded the time it has been allocated but
> would
> > > not
> > > > stop it, just propagate the information (in a form of current way we
> do
> > > > callbacks for example, or maybe (even better) only run it in the
> > context
> > > of
> > > > task  to signal "soft timeout" per task:
> > > >
> > > > >             signal.signal(signal.SIGALRM, self.handle_timeout)
> > > > >            signal.setitimer(signal.ITIMER_REAL, self.seconds)
> > > >
> > > > This has an advantage that it is fully distributed - i.e. we do not
> > need
> > > > anything to monitor 1000s of tasks running to decide if SLA has been
> > > > breached. It's the task itself that will get the "soft" timeout and
> > > > propagate it (and then whoever receives the callback can decide what
> to
> > > do
> > > > next - and this "callback" can happen in either the task context or
> it
> > > > could be done in a DagFileProcessor context as we do currently -
> though
> > > the
> > > > in-task processing seems much more distributed and scalable in
> nature.
> > > > There is one watch-out here that this is not **guaranteed** to work,
> > > there
> > > > are cases, that we already saw that the SIGALRM is not going to be
> > > handled
> > > > locally, when the task uses long running C-level function that is not
> > > > written in the way to react to signals generated in Python (thinks
> > > > low-level long-running Pandas c-method call that does not check
> signal
> > > in a
> > > > long-running-loop. That however probably could be handled by one more
> > > > process fork and have a dedicated child process that would monitor
> > > running
> > > > tasks from a separate process - and we could actually improve both
> > > timeout
> > > > and SLA handling by introducing such extra forked process to handle
> > > > timeout/task level time_limit_sla, so IMHO this is an opportunity to
> > > > improve things.
> > > >
> > >
> > >
> > > Building on what Jarek mentioned, If we can enable the scheduler to
> emit
> > > events for DAGs with SLA configured in cases of
> > > 1. DAG starts executing
> > > 2. Task - start executing(for every task)
> > > 3. Task - stop executing(for every task)
> > > 4. DAG stops executing
> > >
> > > And have a separate process(per dag run) that can keep monitoring such
> > > events and execute a callback in the following circumstances:
> > > 1. DAG level SLA miss
> > >     - When the entire DAG didn't finish in a specific time
> > > 2. Task-level SLA miss
> > >     - Counting time from the start of the DAG to the end of a task.
> > >     - Start of a task to end of a task.
> > >
> > > I think above approch should be addressing issues listed in AIP-57
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-57+SLA+as+a+DAG-Level+Feature
> > > >
> > > 1. Since we have a septate process we no longer have to wait for the
> > > Tasks/DAG to be in the SUCCESS/SKIPPED state or any other terminal
> state.
> > > In this process, we can have a loop executing periodically in intervals
> > of
> > > ex- 1sec to monitor SLA misses by monitoring events data and
> > task_instance
> > > table.
> > > 2. For Manually/Dataset triggered dags, we no longer have a dependency
> > on a
> > > fixed schedule, everything we need to evaluate SLA miss is already
> > present
> > > in Events for that specific DAG.
> > > 3. This approach also enables us to run callbacks on the task level.
> > > 4. We can remove calls to sla_miss_callbacks every time is called
> > > *dag_run.update_state*
> > >
> > > A couple of things I'm not about are -
> > > 1. Where to execute the callbacks. Executing a callback in the same
> > process
> > > as the monitoring process can have a downside if the callback takes
> much
> > > time to execute it will probably cause other SLA callbacks to be
> delayed.
> > > 2. Context of execution of callback, we have to maintain the same
> context
> > > in which the callback is defined.
> > >
> > > Would love to know other people's thoughts on this :)
> > >
> > > Thanks,
> > > Utkarsh Sharma
> > >
> > >
> > > On Sun, Jun 18, 2023 at 4:08 PM Iaroslav Poskriakov <
> > > [email protected]> wrote:
> > >
> > > > I want to say that airflow is a very popular project and the ways of
> > > > calculating SLA are different. Because of different business cases.
> And
> > > if
> > > > it's possible we should make most of them from the box.
> > > >
> > > > вс, 18 июн. 2023 г. в 13:30, Iaroslav Poskriakov <
> > > > [email protected]>:
> > > >
> > > > > So, I totally agree about dag level slas. It's very important to
> have
> > > it
> > > > > and according to Sung Yun proposal it should be implemented not on
> > the
> > > > > scheduler job level.
> > > > >
> > > > > Regarding the second way of determining SLA: <task state STARTED>
> -->
> > > > > ..<doesn't matter what happened>..  --> <task state SUCCESS>.
> > > > > It's very helpful in the way when we want to achieve not technical
> > SLA
> > > > but
> > > > > business SLA for the team which is using that DAG. Because between
> > > those
> > > > > two states anything could happen and at the end we might want to
> > > > understand
> > > > > high level SLA for the task. Because it doesn't matter for
> business I
> > > > guess
> > > > > that path of states of the task was something like: STARTED ->
> > RUNNING
> > > ->
> > > > > FAILED -> RUNNING -> FAILED -> RUNNING -> SUCCESS. And in case when
> > > > > something similar is happening it can be helpful to have an
> > opportunity
> > > > of automatically
> > > > > recognizing  that the expected time for the task crossed the
> border.
> > > > >
> > > > > I agree that for the scheduler it can be too heavy. And also for
> that
> > > > > purpose we need to have some process which is running in parallel
> > with
> > > > the
> > > > > task. It can be one more job for example which is running on the
> same
> > > > > machine as Scheduler, or not on the same.
> > > > >
> > > > >
> > > > > About the third part of my proposal - time for the task in the
> > > > > RUNNING state. I agree with you, Jarek. We can implement it on the
> > task
> > > > > level. For me it seems good.
> > > > >
> > > > > Yaro1
> > > > >
> > > > > вс, 18 июн. 2023 г. в 08:12, Jarek Potiuk <[email protected]>:
> > > > >
> > > > >> I am also for DAG level SLA only (but maybe there are some
> twists).
> > > > >>
> > > > >> And I hope (since Sung Yun has not given up on that) - maybe that
> is
> > > the
> > > > >> right time that others here will chime in and maybe it will let
> the
> > > vote
> > > > >> go
> > > > >> on? I think it would be great to get the SLA feature sorted out so
> > > that
> > > > we
> > > > >> have a chance to stop answering ("yeah, we know SLA is broken, it
> > has
> > > > >> always been"). It would be nice to say "yeah the old deprecated
> SLA
> > is
> > > > >> broken, but we have this new mechanism(s) that replace it". The
> one
> > > > >> proposed by Sung has a good chance of being such a replacement.
> > > > >>
> > > > >> I think having a task-level SLA managed by the Airflow framework
> > might
> > > > >> indeed be too costly and does not fit well in the current
> > > architecture.
> > > > I
> > > > >> think attempting to monitor how long a given task runs by the
> > > scheduler
> > > > is
> > > > >> simply a huge overkill. Generally speaking - scheduler (as
> > surprising
> > > it
> > > > >> might be for anyone) does not monitor executing tasks (at least
> > > > >> principally
> > > > >> speaking). It merely submits the tasks to execute to executor and
> > let
> > > > >> executor handle all kinds of monitoring of what is being executed
> > > when,
> > > > >> and
> > > > >> then - depending on the different types of executors there are
> > various
> > > > >> conditions when and how task is being executed, and various ways
> how
> > > you
> > > > >> can define different kinds of task SLAs. Or at least this is how I
> > > think
> > > > >> about the distributed nature of Airflow on a "logical" level. Once
> > > task
> > > > is
> > > > >> queued for execution, the scheduler takes its hands off and turns
> > its
> > > > >> attention to tasks that are not yet scheduled and should be or
> tasks
> > > > that
> > > > >> are scheduled but not queued yet.
> > > > >>
> > > > >> But maybe some of the SLA "task" expectations can be  implemented
> > in a
> > > > >> limited version serving very limited cases on a task level?
> > > > >>
> > > > >> Referring to what Yaro1 wrote:
> > > > >>
> > > > >> > 1. It doesn't matter for us how long we are spending time on
> some
> > > > >> specific
> > > > >> task. It's important to have an understanding of the lag between
> > > > >> execution_date of dag and success state for the task. We can call
> it
> > > > >> dag_sla. It's similar to the current implementation of
> manage_slas.
> > > > >>
> > > > >> This is basically what Sung proposes, I believe.
> > > > >>
> > > > >>
> > > > >> > 2. It's important to have an understanding and managing how long
> > > some
> > > > >> specific task is working. In my opinion working is the state
> between
> > > > task
> > > > >> last start_date and task first (after last start_date) SUCCESS
> > state.
> > > So
> > > > >> for example for the task which is placed in FAILED state we still
> > have
> > > > to
> > > > >> check an SLA in that strategy. We can call it task_sla.
> > > > >>
> > > > >> I am not sure if I understand it, but If I do, then this is the
> > "super
> > > > >> costly" SLA processing that we should likely avoid. I would love
> to
> > > hear
> > > > >> however, what are some specific use cases that we could show here,
> > > maybe
> > > > >> there are other ways we can achieve similar things.
> > > > >>
> > > > >>
> > > > >> > 3. Sometimes we need to manage time for the task in the RUNNING
> > > state.
> > > > >> We
> > > > >> can call it time_limit_sla.
> > > > >>
> > > > >> This can be IMHO implemented on the task level. We currently have
> > > > timeout
> > > > >> implemented this way - whenever we start the task, we can have a
> > > signal
> > > > >> handler registered with "real" time registered that will cancel
> the
> > > > task.
> > > > >> But I can imagine similar approach with signal and propagate the
> > > > >> information that task exceeded the time it has been allocated but
> > > would
> > > > >> not
> > > > >> stop it, just propagate the information (in a form of current way
> we
> > > do
> > > > >> callbacks for example, or maybe (even better) only run it in the
> > > context
> > > > >> of
> > > > >> task  to signal "soft timeout" per task:
> > > > >>
> > > > >> >             signal.signal(signal.SIGALRM, self.handle_timeout)
> > > > >> >            signal.setitimer(signal.ITIMER_REAL, self.seconds)
> > > > >>
> > > > >> This has an advantage that it is fully distributed - i.e. we do
> not
> > > need
> > > > >> anything to monitor 1000s of tasks running to decide if SLA has
> been
> > > > >> breached. It's the task itself that will get the "soft" timeout
> and
> > > > >> propagate it (and then whoever receives the callback can decide
> what
> > > to
> > > > do
> > > > >> next - and this "callback" can happen in either the task context
> or
> > it
> > > > >> could be done in a DagFileProcessor context as we do currently -
> > > though
> > > > >> the
> > > > >> in-task processing seems much more distributed and scalable in
> > nature.
> > > > >> There is one watch-out here that this is not **guaranteed** to
> work,
> > > > there
> > > > >> are cases, that we already saw that the SIGALRM is not going to be
> > > > handled
> > > > >> locally, when the task uses long running C-level function that is
> > not
> > > > >> written in the way to react to signals generated in Python (thinks
> > > > >> low-level long-running Pandas c-method call that does not check
> > signal
> > > > in
> > > > >> a
> > > > >> long-running-loop. That however probably could be handled by one
> > more
> > > > >> process fork and have a dedicated child process that would monitor
> > > > running
> > > > >> tasks from a separate process - and we could actually improve both
> > > > timeout
> > > > >> and SLA handling by introducing such extra forked process to
> handle
> > > > >> timeout/task level time_limit_sla, so IMHO this is an opportunity
> to
> > > > >> improve things.
> > > > >>
> > > > >> I would love to hear what others think about it :)? I think our
> SLA
> > > for
> > > > >> fixing SLA is about to run out.
> > > > >>
> > > > >>
> > > > >> J.
> > > > >>
> > > > >>
> > > > >> On Thu, Jun 15, 2023 at 4:05 PM Sung Yun <[email protected]>
> wrote:
> > > > >>
> > > > >> > Hello!
> > > > >> >
> > > > >> > Thank you very much for the feedback on the proposal. I’ve been
> > > hoping
> > > > >> to
> > > > >> > get some more traction on this proposal, so it’s great to hear
> > from
> > > > >> another
> > > > >> > user of the feature.
> > > > >> >
> > > > >> > I understand that there’s a lot of support for keeping a native
> > task
> > > > >> level
> > > > >> > SLA feature, and I definitely agree with that sentiment. Our
> > > > >> organization
> > > > >> > very much relies on Airflow to evaluate ‘task_sla’ in order to
> > keep
> > > > >> track
> > > > >> > of which tasks in each dags failed to succeed by an expected
> time.
> > > > >> >
> > > > >> > In the AIP I put together on the Confluence page, and in the
> > Google
> > > > >> docs,
> > > > >> > I have identified why the existing implementation of the task
> > level
> > > > SLA
> > > > >> > feature can be problematic and is often misleading for Airflow
> > > users.
> > > > >> The
> > > > >> > feature is also quite costly for Airflow scheduler and
> > > dag_processor.
> > > > >> >
> > > > >> > In that sense, the discussion is not about whether or not these
> > SLA
> > > > >> > features are important to the users, but much more technical.
> Can
> > a
> > > > >> > task-level feature be supported in a first-class way as a core
> > > feature
> > > > >> of
> > > > >> > Airflow, or should it be implemented by the users, for example
> as
> > > > >> > independent tasks by leveraging Deferrable Operators.
> > > > >> >
> > > > >> > My current thought is that only Dag level SLAs can be supported
> > in a
> > > > >> > non-disruptive way by the scheduler, and that task level SLAs
> > should
> > > > be
> > > > >> > handled outside of core Airflow infrastructure code. If you
> > strongly
> > > > >> > believe otherwise, I think it would be helpful if you could
> > propose
> > > an
> > > > >> > alternative technical solution that solves many of the existing
> > > > >> problems in
> > > > >> > the task-level SLA feature.
> > > > >> >
> > > > >> > Sent from my iPhone
> > > > >> >
> > > > >> > > On Jun 13, 2023, at 1:10 PM, Ярослав Поскряков <
> > > > >> > [email protected]> wrote:
> > > > >> > >
> > > > >> > > Mechanism of SLA
> > > > >> > >
> > > > >> > > Hi, I read the previous conversation regarding SLA and I think
> > > > >> removing
> > > > >> > the
> > > > >> > > opportunity to set sla for the task level will be a big
> mistake.
> > > > >> > > So, the proposed implementation of the task level SLA will not
> > be
> > > > >> working
> > > > >> > > correctly.
> > > > >> > >
> > > > >> > > That's why I guess we have to think about the mechanism of
> using
> > > > SLA.
> > > > >> > >
> > > > >> > > I guess we should check three different cases in general.
> > > > >> > >
> > > > >> > >
> > > > >> > > 1. It doesn't matter for us how long we are spending time on
> > some
> > > > >> > specific
> > > > >> > > task. It's important to have an understanding of the lag
> between
> > > > >> > > execution_date of dag and success state for the task. We can
> > call
> > > it
> > > > >> > > dag_sla. It's similar to the current implementation of
> > > manage_slas.
> > > > >> > >
> > > > >> > >
> > > > >> > > 2. It's important to have an understanding and managing how
> long
> > > > some
> > > > >> > > specific task is working. In my opinion working is the state
> > > between
> > > > >> task
> > > > >> > > last start_date and task first (after last start_date) SUCCESS
> > > > state.
> > > > >> So
> > > > >> > > for example for the task which is placed in FAILED state we
> > still
> > > > >> have to
> > > > >> > > check an SLA in that strategy. We can call it task_sla.
> > > > >> > >
> > > > >> > >
> > > > >> > > 3. Sometimes we need to manage time for the task in the
> RUNNING
> > > > >> state. We
> > > > >> > > can call it time_limit_sla.
> > > > >> > >
> > > > >> > >
> > > > >> > > Those three types of SLA will cover all possible cases.
> > > > >> > >
> > > > >> > >
> > > > >> > > So we will have three different strategies for SLA.
> > > > >> > >
> > > > >> > >
> > > > >> > > I guess we can use for dag_sla that idea -
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals
> > > > >> > >
> > > > >> > >
> > > > >> > > For task_sla and time_limit_sla I prefer to stay with using
> > > > >> SchedulerJob
> > > > >> > >
> > > > >> > >
> > > > >> > > Github: Yaro1
> > > > >> >
> > > > >> >
> > > ---------------------------------------------------------------------
> > > > >> > To unsubscribe, e-mail: [email protected]
> > > > >> > For additional commands, e-mail: [email protected]
> > > > >> >
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
> >
> > --
> > Sung Yun
> > Cornell Tech '20
> > Master of Engineering in Computer Science
> >
>


-- 
Sung Yun
Cornell Tech '20
Master of Engineering in Computer Science

Reply via email to