syun64 opened a new pull request, #33532:
URL: https://github.com/apache/airflow/pull/33532

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   closes #32816
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ### Summary
   
   This PR introduces Dag SLAs as a part of **AIP-57 Refactor SLA Feature**.
   
   After many discussions, the community has voted to introduce new SLA 
features aimed at replacing the existing SLA feature which has caused a lot of 
confusion and dissatisfaction in the user base over the years. The proposal 
introduces SLAs that are measured and contained within the lifetime of a Dag 
Run, and within the lifetime of a Task Instance which cost significantly less 
for the Airflow Infrastructure to compute than the existing concept of SLAs. 
The proposed SLA callback functions will also accept the TI context as the 
argument, which is a parameter that is used by other callback types 
(on_success_callback, on_failure_callback, on_execute_callback) to honor a 
consistent interface.
   
   ### Dag Updates
   #### Added 
           sla: timedelta | None = None
           on_sla_miss_callback: None | DagStateChangeCallback | 
list[DagStateChangeCallback] = None
   
   #### Marked for Deprecation
           sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = 
None
   
   ### Dag Run DB Updates
   #### Added 
           sla_missed = Column(Boolean, default=False)
   
   ### DagCallbackRequest Updates (callback_data json payload)
   #### Added
           sla_miss: bool | None = False
           dagrun_state: DagRunState
   #### Removed
           is_failure_callback: bool | None = True
   
   ### Dag SLA Measurement Semantics 
   SLA tracking logic is evaluated when the active Dag Run is checked for 
updates using DagRun.update_state which is executed in the main scheduling 
loop. Dag SLA is tracked for all run types except for DagRunType.BACKFILL_JOB 
if **sla** parameter is provided in the Dag definition. 
   
   If **sla_missed** attribute of the Dag Run is already set to True, the 
scheduler loop will skip the SLA check for that dag run. This condition ensures 
that the SLA miss callback is generated only once when a scheduler loop has 
detected that the dag run has just missed its SLA and has been made the 
corresponding Dag Run record update in the DB.
   
   SLA is measured against the time between the Dag Run start_date (time) and 
the current time. If SLA is less than the time between the current time and the 
Dag Run start_date, SLA has been missed for the active Dag Run. Using the 
start_date (instead of the scheduled start time according to the time table) 
allows us to keep a consistent measuring pattern across MANUAL and SCHEDULED 
run types, and keep the SLA evaluation logic simple for the initial version of 
this feature.
   
   SLA callback is indicated by the sla_miss boolean flag on 
DagCallbackRequest. The State of the DagRun corresponding to the 
DagCallbackRequest is propagated through the dagrun_state parameter, which is 
also used to deduce whether a on_success_callback or a on_failure_callback must 
be executed along with the request. It is now possible for a DagCallbackRequest 
to be issued without a finished DagRun State (SUCCESS or FAILED); it is upto 
dagrun.handle_callback function to parse the parameters and execute the 
appropriate callback related to the given DagRun State or sla_miss flag.
   
   ### Question:
   Do we need to consider the impact of removing is_failure_callback from 
DagCallbackRequest and replacing it with dagrun_state? If DbCallbackRequest is 
enabled and unexecuted DagCallbackRequests are stored in the DB at the time of 
DB upgrade, would it lead to some callbacks not being executed?
   EDIT: I've added an alembic migration step that unpacks the json 
callback_data and programmatically updates the json payload with the logical 
mapping of is_failure_callback <-> dagrun_state for DagCallbackRequest 
callback_types of DbCallbackRequest records. I wanted to get others' thoughts 
on whether this suggestion improves the reliability of the migration or makes 
it more risky.
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to