This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 657223c0fd Deferrable Operators Docs Edits (#33620)
657223c0fd is described below

commit 657223c0fd5460b1aa9fca8d96c2b4a17eb40ef9
Author: Laura Zdanski <25642903+lzdan...@users.noreply.github.com>
AuthorDate: Thu Dec 7 13:05:08 2023 -0500

    Deferrable Operators Docs Edits (#33620)
    
    Co-authored-by: Jed Cunningham 
<66968678+jedcunning...@users.noreply.github.com>
    Co-authored-by: Daniel Standish 
<15932138+dstand...@users.noreply.github.com>
---
 .../authoring-and-scheduling/deferring.rst         | 114 ++++++++++-----------
 1 file changed, 57 insertions(+), 57 deletions(-)

diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst 
b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
index c44ac2c1dd..adacf4b79d 100644
--- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
@@ -18,45 +18,48 @@
 Deferrable Operators & Triggers
 ===============================
 
-Standard :doc:`Operators </core-concepts/operators>` and :doc:`Sensors 
<../core-concepts/sensors>` take up a full *worker slot* for the entire time 
they are running, even if they are idle; for example, if you only have 100 
worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor 
that's currently running but idle, then you *cannot run anything else* - even 
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for 
Sensors solves some of this, all [...]
+Standard :doc:`Operators </core-concepts/operators>` and :doc:`Sensors 
<../core-concepts/sensors>` take up a full *worker slot* for the entire time 
they are running, even if they are idle. For example, if you only have 100 
worker slots available to run tasks, and you have 100 DAGs waiting on a sensor 
that's currently running but idle, then you *cannot run anything else* - even 
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for 
sensors solves some of this, by  [...]
 
-This is where *Deferrable Operators* come in. A deferrable operator is one 
that is written with the ability to suspend itself and free up the worker when 
it knows it has to wait, and hand off the job of resuming it to something 
called a *Trigger*. As a result, while it is suspended (deferred), it is not 
taking up a worker slot and your cluster will have a lot less resources wasted 
on idle Operators or Sensors. Note that by default deferred tasks will not use 
up pool slots, if you would l [...]
+This is where *Deferrable Operators* can be used. When it has nothing to do 
but wait, an operator can suspend itself and free up the worker for other 
processes by *deferring*. When an operator defers, execution moves to the 
triggerer, where the trigger specified by the operator will run.  The trigger 
can do the polling or waiting required by the operator. Then, when the trigger 
finishes polling or waiting, it sends a signal for the operator to resume its 
execution. During the deferred ph [...]
 
-*Triggers* are small, asynchronous pieces of Python code designed to be run 
all together in a single Python process; because they are asynchronous, they 
are able to all co-exist efficiently. As an overview of how this process works:
+*Triggers* are small, asynchronous pieces of Python code designed to run in a 
single Python process. Because they are asynchronous, they can all co-exist 
efficiently in the *triggerer* Airflow component.
 
-* A task instance (running operator) gets to a point where it has to wait, and 
defers itself with a trigger tied to the event that should resume it. This 
frees up the worker to run something else.
-* The new Trigger instance is registered inside Airflow, and picked up by a 
*triggerer* process
-* The trigger is run until it fires, at which point its source task is 
re-scheduled
-* The scheduler queues the task to resume on a worker node
+An overview of how this process works:
 
-Using deferrable operators as a DAG author is almost transparent; writing 
them, however, takes a bit more work.
+* A task instance (running operator) reaches a point where it has to wait for 
other operations or conditions, and defers itself with a trigger tied to an 
event to resume it. This frees up the worker to run something else.
+* The new trigger instance is registered by Airflow, and picked up by a 
triggerer process.
+* The trigger runs until it fires, at which point its source task is 
re-scheduled by the scheduler.
+* The scheduler queues the task to resume on a worker node.
 
+You can either use pre-written deferrable operators as a DAG author or write 
your own. Writing them, however, requires that they meet certain design 
criteria.
 
 Using Deferrable Operators
 --------------------------
 
-If all you wish to do is use pre-written Deferrable Operators (such as 
``TimeSensorAsync``, which comes with Airflow), then there are only two steps 
you need:
+If you want to use pre-written deferrable operators that come with Airflow, 
such as ``TimeSensorAsync``, then you only need to complete two steps:
 
-* Ensure your Airflow installation is running at least one ``triggerer`` 
process, as well as the normal ``scheduler``
+* Ensure your Airflow installation runs at least one ``triggerer`` process, as 
well as the normal ``scheduler``
 * Use deferrable operators/sensors in your DAGs
 
-That's it; everything else will be automatically handled for you. If you're 
upgrading existing DAGs, we even provide some API-compatible sensor variants 
(e.g. ``TimeSensorAsync`` for ``TimeSensor``) that you can swap into your DAG 
with no other changes required.
+Airflow automatically handles and implements the deferral processes for you.
 
-Note that you cannot yet use the deferral ability from inside custom 
PythonOperator/TaskFlow Python functions; it is only available to traditional, 
class-based Operators at the moment.
+If you're upgrading existing DAGs to use deferrable operators, Airflow 
contains API-compatible sensor variants, like ``TimeSensorAsync`` for 
``TimeSensor``. Add these variants into your DAG to use deferrable operators 
with no other changes required.
+
+Note that you can't use the deferral ability from inside custom PythonOperator 
or TaskFlow Python functions. Deferral is only available to traditional, 
class-based operators.
 
 .. _deferring/writing:
 
 Writing Deferrable Operators
 ----------------------------
 
-Writing a deferrable operator takes a bit more work. There are some main 
points to consider:
+When writing a deferrable operators these are the main points to consider:
 
-* Your Operator must defer itself with a Trigger. If there is a Trigger in 
core Airflow you can use, great; otherwise, you will have to write one.
-* Your Operator will be stopped and removed from its worker while deferred, 
and no state will persist automatically. You can persist state by asking 
Airflow to resume you at a certain method or pass certain kwargs, but that's it.
-* You can defer multiple times, and you can defer before/after your Operator 
does significant work, or only defer if certain conditions are met (e.g. a 
system does not have an immediate answer). Deferral is entirely under your 
control.
-* Any Operator can defer; no special marking on its class is needed, and it's 
not limited to Sensors.
-* In order for any changes to a Trigger to be reflected, the *triggerer* needs 
to be restarted whenever the Trigger is modified.
-* If you want to add an operator or sensor that supports both deferrable and 
non-deferrable modes, it's suggested to add ``deferrable: bool = 
conf.getboolean("operators", "default_deferrable", fallback=False)`` to the 
``__init__`` method of the operator and use it to decide whether to run the 
operator in deferrable mode. You'll be able to configure the default value of 
``deferrable`` of all the operators and sensors that support switching between 
deferrable and non-deferrable mode throug [...]
+* Your operator must defer itself with a trigger. You can use a trigger 
included in core Airflow, or you can write a custom one.
+* Your operator will be stopped and removed from its worker while deferred, 
and no state persists automatically. You can persist state by instructing 
Airflow to resume the operator at a certain method or by passing certain kwargs.
+* You can defer multiple times, and you can defer before or after your 
operator does significant work. Or, you can defer if certain conditions are 
met. For example, if a system does not have an immediate answer. Deferral is 
entirely under your control.
+* Any operator can defer; no special marking on its class is needed, and it's 
not limited to sensors.
+* In order for any changes to a trigger to be reflected, the *triggerer* needs 
to be restarted whenever the trigger is modified.
+* If you want to add an operator or sensor that supports both deferrable and 
non-deferrable modes, it's suggested to add ``deferrable: bool = 
conf.getboolean("operators", "default_deferrable", fallback=False)`` to the 
``__init__`` method of the operator and use it to decide whether to run the 
operator in deferrable mode. You can configure the default value of 
``deferrable`` for all the operators and sensors that support switching between 
deferrable and non-deferrable mode through ``defau [...]
 
 .. code-block:: python
 
@@ -97,22 +100,22 @@ Writing a deferrable operator takes a bit more work. There 
are some main points
 Triggering Deferral
 ~~~~~~~~~~~~~~~~~~~
 
-If you want to trigger deferral, at any place in your Operator you can call 
``self.defer(trigger, method_name, kwargs, timeout)``, which will raise a 
special exception that Airflow will catch. The arguments are:
+If you want to trigger deferral, at any place in your operator, you can call 
``self.defer(trigger, method_name, kwargs, timeout)``. This raises a special 
exception for Airflow. The arguments are:
 
-* ``trigger``: An instance of a Trigger that you wish to defer on. It will be 
serialized into the database.
-* ``method_name``: The method name on your Operator you want Airflow to call 
when it resumes.
-* ``kwargs``: Additional keyword arguments to pass to the method when it is 
called. Optional, defaults to ``{}``.
-* ``timeout``: A timedelta that specifies a timeout after which this deferral 
will fail, and fail the task instance. Optional, defaults to ``None``, meaning 
no timeout.
+* ``trigger``: An instance of a trigger that you want to defer to. It will be 
serialized into the database.
+* ``method_name``: The method name on your operator that you want Airflow to 
call when it resumes.
+* ``kwargs``: (Optional) Additional keyword arguments to pass to the method 
when it is called. Defaults to ``{}``.
+* ``timeout``: (Optional) A timedelta that specifies a timeout after which 
this deferral will fail, and fail the task instance. Defaults to ``None``, 
which means no timeout.
 
-When you opt to defer, your Operator will *stop executing at that point and be 
removed from its current worker*. No state - such as local variables, or 
attributes set on ``self`` - will persist, and when your Operator is resumed it 
will be a *brand new instance* of it. The only way you can pass state from the 
old instance of the Operator to the new one is via ``method_name`` and 
``kwargs``.
+When you opt to defer, your operator will stop executing at that point and be 
removed from its current worker. No state will persist, such as local variables 
or attributes set on ``self``. When your operator resumes, it resumes as a new 
instance of it. The only way you can pass state from the old instance of the 
operator to the new one is with ``method_name`` and ``kwargs``.
 
-When your Operator is resumed, an ``event`` item will be added to the kwargs 
passed to the ``method_name`` method. The ``event`` object contains the payload 
from the trigger event that resumed your Operator. Depending on the trigger, 
this may be useful to your operator (e.g. it's a status code or URL to fetch 
results), or it may not be important (it's just a datetime). Your 
``method_name`` method, however, *must* accept ``event`` as a keyword argument.
+When your operator resumes, Airflow adds an ``event`` object to the kwargs 
passed to the ``method_name`` method. This ``event`` object contains the 
payload from the trigger event that resumed your operator. Depending on the 
trigger, this can be useful to your operator, like it's a status code or URL to 
fetch results. Or, it might be unimportant information, like a datetime. Your 
``method_name`` method, however, *must* accept ``event`` as a keyword argument.
 
-If your Operator returns from either its first ``execute()`` method when it's 
new, or a subsequent method specified by ``method_name``, it will be considered 
complete and will finish executing.
+If your operator returns from either its first ``execute()`` method when it's 
new, or a subsequent method specified by ``method_name``, it will be considered 
complete and finish executing.
 
-You are free to set ``method_name`` to ``execute`` if you want your Operator 
to have one entrypoint, but it, too, will have to accept ``event`` as an 
optional keyword argument.
+You can set ``method_name`` to ``execute`` if you want your operator to have 
one entrypoint, but it must also accept ``event`` as an optional keyword 
argument.
 
-Here's a basic example of how a sensor might trigger deferral
+Here's a basic example of how a sensor might trigger deferral:
 
 .. code-block:: python
 
@@ -132,36 +135,35 @@ Here's a basic example of how a sensor might trigger 
deferral
             # We have no more work to do here. Mark as complete.
             return
 
-This Sensor is literally just a thin wrapper around the Trigger, so all it 
does is defer to the trigger, and specify a different method to come back to 
when the trigger fires - which, as it returns immediately, marks the Sensor as 
successful.
-
-Under the hood, ``self.defer`` raises the ``TaskDeferred`` exception, so it 
will work anywhere inside your Operator's code, even buried many nested calls 
deep inside ``execute()``. You are free to raise ``TaskDeferred`` manually if 
you wish; it takes the same arguments as ``self.defer``.
+This sensor is just a thin wrapper around the trigger. It defers to the 
trigger, and specifies a different method to come back to when the trigger 
fires.  When it returns immediately, it marks the sensor as successful.
 
-Note that ``execution_timeout`` on Operators is considered over the *total 
runtime*, not individual executions in-between deferrals - this means that if 
``execution_timeout`` is set, an Operator may fail while it's deferred or while 
it's running after a deferral, even if it's only been resumed for a few seconds.
+The ``self.defer`` call raises the ``TaskDeferred`` exception, so it can work 
anywhere inside your operator's code, even when nested many calls deep inside 
``execute()``. You can also raise ``TaskDeferred`` manually, which uses the 
same arguments as ``self.defer``.
 
+``execution_timeout`` on operators is determined from the *total runtime*, not 
individual executions between deferrals. This means that if 
``execution_timeout`` is set, an operator can fail while it's deferred or while 
it's running after a deferral, even if it's only been resumed for a few seconds.
 
 Writing Triggers
 ~~~~~~~~~~~~~~~~
 
-A Trigger is written as a class that inherits from ``BaseTrigger``, and 
implements three methods:
+A *Trigger* is written as a class that inherits from ``BaseTrigger``, and 
implements three methods:
 
-* ``__init__``, to receive arguments from Operators instantiating it
-* ``run``, an asynchronous method that runs its logic and yields one or more 
``TriggerEvent`` instances as an asynchronous generator
-* ``serialize``, which returns the information needed to re-construct this 
trigger, as a tuple of the classpath, and keyword arguments to pass to 
``__init__``
+* ``__init__``: A method to receive arguments from operators instantiating it.
+* ``run``: An asynchronous method that runs its logic and yields one or more 
``TriggerEvent`` instances as an asynchronous generator.
+* ``serialize``: Returns the information needed to re-construct this trigger, 
as a tuple of the classpath, and keyword arguments to pass to ``__init__``.
 
-There's also some design constraints to be aware of:
+There's some design constraints to be aware of when writing your own trigger:
 
 * The ``run`` method *must be asynchronous* (using Python's asyncio), and 
correctly ``await`` whenever it does a blocking operation.
 * ``run`` must ``yield`` its TriggerEvents, not return them. If it returns 
before yielding at least one event, Airflow will consider this an error and 
fail any Task Instances waiting on it. If it throws an exception, Airflow will 
also fail any dependent task instances.
-* You should assume that a trigger instance may run *more than once* (this can 
happen if a network partition occurs and Airflow re-launches a trigger on a 
separated machine). So you must be mindful about side effects. For example you 
might not want to use a trigger to insert database rows.
-* If your trigger is designed to emit more than one event (not currently 
supported), then each emitted event *must* contain a payload that can be used 
to deduplicate events if the trigger is being run in multiple places. If you 
only fire one event and don't need to pass information back to the Operator, 
you can just set the payload to ``None``.
-* A trigger may be suddenly removed from one triggerer service and started on 
a new one, for example if subnets are changed and a network partition results, 
or if there is a deployment. If desired you may implement the ``cleanup`` 
method, which is always called after ``run`` whether the trigger exits cleanly 
or otherwise.
+* You should assume that a trigger instance can run *more than once*. This can 
happen if a network partition occurs and Airflow re-launches a trigger on a 
separated machine. So, you must be mindful about side effects. For example you 
might not want to use a trigger to insert database rows.
+* If your trigger is designed to emit more than one event (not currently 
supported), then each emitted event *must* contain a payload that can be used 
to deduplicate events if the trigger is running in multiple places. If you only 
fire one event and don't need to pass information back to the operator, you can 
just set the payload to ``None``.
+* A trigger can suddenly be removed from one triggerer service and started on 
a new one. For example, if subnets are changed and a network partition results 
or if there is a deployment. If desired, you can implement the ``cleanup`` 
method, which is always called after ``run``, whether the trigger exits cleanly 
or otherwise.
 
 .. note::
 
-    Currently Triggers are only used up to their first event, as they are only 
used for resuming deferred tasks (which happens on the first event fired). 
However, we plan to allow DAGs to be launched from triggers in future, which is 
where multi-event triggers will be more useful.
+    Currently triggers are only used until their first event, because they are 
only used for resuming deferred tasks, and tasks resume after the first event 
fires. However, Airflow plans to allow DAGs to be launched from triggers in 
future, which is where multi-event triggers will be more useful.
 
 
-Here's the structure of a basic Trigger
+This example shows the structure of a basic trigger, a very simplified version 
of Airflow's ``DateTimeTrigger``:
 
 .. code-block:: python
 
@@ -185,35 +187,33 @@ Here's the structure of a basic Trigger
             yield TriggerEvent(self.moment)
 
 
-This is a very simplified version of Airflow's ``DateTimeTrigger``, and you 
can see several things here:
+The code example shows several things:
 
-* ``__init__`` and ``serialize`` are written as a pair; the Trigger is 
instantiated once when it is submitted by the Operator as part of its deferral 
request, then serialized and re-instantiated on any *triggerer* process that 
runs the trigger.
-* The ``run`` method is declared as an ``async def``, as it *must* be 
asynchronous, and uses ``asyncio.sleep`` rather than the regular ``time.sleep`` 
(as that would block the process).
+* ``__init__`` and ``serialize`` are written as a pair. The trigger is 
instantiated once when it is submitted by the operator as part of its deferral 
request, then serialized and re-instantiated on any triggerer process that runs 
the trigger.
+* The ``run`` method is declared as an ``async def``, as it *must* be 
asynchronous, and uses ``asyncio.sleep`` rather than the regular ``time.sleep`` 
(because that would block the process).
 * When it emits its event it packs ``self.moment`` in there, so if this 
trigger is being run redundantly on multiple hosts, the event can be 
de-duplicated.
 
-Triggers can be as complex or as simple as you like provided you keep inside 
this contract; they are designed to be run in a highly-available fashion, 
auto-distributed among hosts running the *triggerer*. We encourage you to avoid 
any kind of persistent state in a trigger; they should get everything they need 
from their ``__init__``, so they can be serialized and moved around freely.
-
-If you are new to writing asynchronous Python, you should be very careful 
writing your ``run()`` method; Python's async model means that any code that 
does not correctly ``await`` when it does a blocking operation will block the 
*entire process*. Airflow will attempt to detect this and warn you in the 
triggerer logs when it happens, but we strongly suggest you set the variable 
``PYTHONASYNCIODEBUG=1`` when you are writing your Trigger to enable extra 
checks from Python to make sure you'r [...]
+Triggers can be as complex or as simple as you want, provided they meet the 
design constraints. They can run in a highly-available fashion, and are 
auto-distributed among hosts running the triggerer. We encourage you to avoid 
any kind of persistent state in a trigger. Triggers should get everything they 
need from their ``__init__``, so they can be serialized and moved around freely.
 
+If you are new to writing asynchronous Python, be very careful when writing 
your ``run()`` method. Python's async model means that code can block the 
entire process if it does not correctly ``await`` when it does a blocking 
operation. Airflow attempts to detect process blocking code and warn you in the 
triggerer logs when it happens. You can enable extra checks by Python by 
setting the variable ``PYTHONASYNCIODEBUG=1`` when you are writing your trigger 
to make sure you're writing non-blo [...]
 
 High Availability
 -----------------
 
-Triggers are designed from the ground-up to be highly-available; if you want 
to run a highly-available setup, simply run multiple copies of ``triggerer`` on 
multiple hosts. Much like ``scheduler``, they will automatically co-exist with 
correct locking and HA.
-
-Depending on how much work the triggers are doing, you can fit from hundreds 
to tens of thousands of triggers on a single ``triggerer`` host. By default, 
every ``triggerer`` will have a capacity of 1000 triggers it will try to run at 
once; you can change this with the ``--capacity`` argument. If you have more 
triggers trying to run than you have capacity across all of your ``triggerer`` 
processes, some triggers will be delayed from running until others have 
completed.
+Triggers are designed to work in a high availability (HA) architecture. If you 
want to run a high availability setup, run multiple copies of ``triggerer`` on 
multiple hosts. Much like ``scheduler``, they automatically co-exist with 
correct locking and HA.
 
-Airflow tries to only run triggers in one place at once, and maintains a 
heartbeat to all ``triggerers`` that are currently running. If a ``triggerer`` 
dies, or becomes partitioned from the network where Airflow's database is 
running, Airflow will automatically re-schedule triggers that were on that host 
to run elsewhere (after waiting (2.1 * ``triggerer.job_heartbeat_sec``) seconds 
for the machine to re-appear).
+Depending on how much work the triggers are doing, you can fit hundreds to 
tens of thousands of triggers on a single ``triggerer`` host. By default, every 
``triggerer`` has a capacity of 1000 triggers that it can try to run at once. 
You can change the number of triggers that can run simultaneously with the 
``--capacity`` argument. If you have more triggers trying to run than you have 
capacity across all of your ``triggerer`` processes, some triggers will be 
delayed from running until oth [...]
 
-This means it's possible, but unlikely, for triggers to run in multiple places 
at once; this is designed into the Trigger contract, however, and entirely 
expected. Airflow will de-duplicate events fired when a trigger is running in 
multiple places simultaneously, so this process should be transparent to your 
Operators.
+Airflow tries to only run triggers in one place at once, and maintains a 
heartbeat to all ``triggerers`` that are currently running. If a ``triggerer`` 
dies, or becomes partitioned from the network where Airflow's database is 
running, Airflow automatically re-schedules triggers that were on that host to 
run elsewhere. Airflow waits (2.1 * ``triggerer.job_heartbeat_sec``) seconds 
for the machine to re-appear before rescheduling the triggers.
 
-Note that every extra ``triggerer`` you run will result in an extra persistent 
connection to your database.
+This means it's possible, but unlikely, for triggers to run in multiple places 
at once. This behavior is designed into the trigger contract, however, and is 
expected behavior. Airflow de-duplicates events fired when a trigger is running 
in multiple places simultaneously, so this process is transparent to your 
operators.
 
+Note that every extra ``triggerer`` you run results in an extra persistent 
connection to your database.
 
 Difference between Mode='reschedule' and Deferrable=True in Sensors
 -------------------------------------------------------------------
 
-In Airflow, Sensors wait for specific conditions to be met before proceeding 
with downstream tasks. Sensors have two options for managing idle periods: 
mode='reschedule' and deferrable=True. As mode='reschedule' is a parameter 
specific to the BaseSensorOperator in Airflow, which allows the sensor to 
reschedule itself if the condition is not met, whereas, 'deferrable=True' is a 
convention used by some operators to indicate that the task can be retried (or 
deferred) later, but it is not a  [...]
+In Airflow, sensors wait for specific conditions to be met before proceeding 
with downstream tasks. Sensors have two options for managing idle periods: 
``mode='reschedule'`` and ``deferrable=True``. Because ``mode='reschedule'`` is 
a parameter specific to the BaseSensorOperator in Airflow, it allows the sensor 
to reschedule itself if the condition is not met. ``'deferrable=True'`` is a 
convention used by some operators to indicate that the task can be retried (or 
deferred) later, but it  [...]
 
 
+--------------------------------------------------------+--------------------------------------------------------+
 |           mode='reschedule'                            |          
deferrable=True                               |
@@ -221,7 +221,7 @@ In Airflow, Sensors wait for specific conditions to be met 
before proceeding wit
 | Continuously reschedules itself until condition is met |  Pauses execution 
when idle, resumes when condition    |
 |                                                        |  changes            
                                   |
 
+--------------------------------------------------------+--------------------------------------------------------+
-| Resource Usage is Higher (repeated execution)          |  Resource Usage is 
Lower (pauses when idle, frees      |
+| Resource use is higher (repeated execution)            |  Resource use is 
lower (pauses when idle, frees        |
 |                                                        |  up worker slots)   
                                   |
 
+--------------------------------------------------------+--------------------------------------------------------+
 | Conditions expected to change over time                |  Waiting for 
external events or resources              |

Reply via email to