This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 05639e19da1b9c649394254d0e50692236b9ad3d
Author: Niko Oliveira <oniko...@amazon.com>
AuthorDate: Tue Oct 10 11:47:33 2023 -0700

    Executors doc update (#34324)
    
    * Executors doc update
    
    Added sections describing the BaseExecutor interface in more detail
    to help in writing custom executors
    
    (cherry picked from commit 8fdf3582c2967161dd794f7efb53691d092f0ce6)
---
 .../core-concepts/executor/debug.rst               |  84 ++++++-------
 .../core-concepts/executor/index.rst               | 140 +++++++++++++++++++--
 docs/apache-airflow/public-airflow-interface.rst   |   6 +-
 3 files changed, 177 insertions(+), 53 deletions(-)

diff --git a/docs/apache-airflow/core-concepts/executor/debug.rst 
b/docs/apache-airflow/core-concepts/executor/debug.rst
index fb14c7a6f0..896f79ce1c 100644
--- a/docs/apache-airflow/core-concepts/executor/debug.rst
+++ b/docs/apache-airflow/core-concepts/executor/debug.rst
@@ -15,8 +15,47 @@
     specific language governing permissions and limitations
     under the License.
 
+.. _executor:DebugExecutor:
+
+Debug Executor (deprecated)
+===========================
+
+The :class:`~airflow.executors.debug_executor.DebugExecutor` is meant as
+a debug tool and can be used from IDE. It is a single process executor that
+queues :class:`~airflow.models.taskinstance.TaskInstance` and executes them by 
running
+``_run_raw_task`` method.
+
+Due to its nature the executor can be used with SQLite database. When used
+with sensors the executor will change sensor mode to ``reschedule`` to avoid
+blocking the execution of DAG.
+
+Additionally ``DebugExecutor`` can be used in a fail-fast mode that will make
+all other running or scheduled tasks fail immediately. To enable this option 
set
+``AIRFLOW__DEBUG__FAIL_FAST=True`` or adjust ``fail_fast`` option in your 
``airflow.cfg``.
+For more information on setting the configuration, see 
:doc:`../../howto/set-config`.
+
+**IDE setup steps:**
+
+1. Add ``main`` block at the end of your DAG file to make it runnable.
+
+It will run a backfill job:
+
+.. code-block:: python
+
+  if __name__ == "__main__":
+      from airflow.utils.state import State
+
+      dag.clear()
+      dag.run()
+
+
+2. Setup ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in run configuration of 
your IDE. In
+   this step you should also setup all environment variables required by your 
DAG.
+
+3. Run / debug the DAG file.
+
 Testing DAGs with dag.test()
-=============================
+*****************************
 
 To debug DAGs in an IDE, you can set up the ``dag.test`` command in your dag 
file and run through your DAG in a single
 serialized python process.
@@ -35,7 +74,7 @@ and that's it! You can add argument such as 
``execution_date`` if you want to te
 you can run or debug DAGs as needed.
 
 Comparison with DebugExecutor
-*****************************
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 The ``dag.test`` command has the following benefits over the 
:class:`~airflow.executors.debug_executor.DebugExecutor`
 class, which is now deprecated:
@@ -46,7 +85,7 @@ class, which is now deprecated:
 
 
 Debugging Airflow DAGs on the command line
-==========================================
+******************************************
 
 With the same two line addition as mentioned in the above section, you can now 
easily debug a DAG using pdb as well.
 Run ``python -m pdb <path to dag file>.py`` for an interactive debugging 
experience on the command line.
@@ -63,42 +102,3 @@ Run ``python -m pdb <path to dag file>.py`` for an 
interactive debugging experie
   -> bash_command='echo 1',
   (Pdb) run_this_last
   <Task(EmptyOperator): run_this_last>
-
-.. _executor:DebugExecutor:
-
-Debug Executor (deprecated)
-===========================
-
-The :class:`~airflow.executors.debug_executor.DebugExecutor` is meant as
-a debug tool and can be used from IDE. It is a single process executor that
-queues :class:`~airflow.models.taskinstance.TaskInstance` and executes them by 
running
-``_run_raw_task`` method.
-
-Due to its nature the executor can be used with SQLite database. When used
-with sensors the executor will change sensor mode to ``reschedule`` to avoid
-blocking the execution of DAG.
-
-Additionally ``DebugExecutor`` can be used in a fail-fast mode that will make
-all other running or scheduled tasks fail immediately. To enable this option 
set
-``AIRFLOW__DEBUG__FAIL_FAST=True`` or adjust ``fail_fast`` option in your 
``airflow.cfg``.
-For more information on setting the configuration, see 
:doc:`../../howto/set-config`.
-
-**IDE setup steps:**
-
-1. Add ``main`` block at the end of your DAG file to make it runnable.
-
-It will run a backfill job:
-
-.. code-block:: python
-
-  if __name__ == "__main__":
-      from airflow.utils.state import State
-
-      dag.clear()
-      dag.run()
-
-
-2. Setup ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in run configuration of 
your IDE. In
-   this step you should also setup all environment variables required by your 
DAG.
-
-3. Run / debug the DAG file.
diff --git a/docs/apache-airflow/core-concepts/executor/index.rst 
b/docs/apache-airflow/core-concepts/executor/index.rst
index 10d95355bf..4a2761a0cf 100644
--- a/docs/apache-airflow/core-concepts/executor/index.rst
+++ b/docs/apache-airflow/core-concepts/executor/index.rst
@@ -30,13 +30,6 @@ Built-in executors are referred to by name, for example:
     [core]
     executor = KubernetesExecutor
 
-You can also write your own custom executors, and refer to them by their full 
path:
-
-.. code-block:: ini
-
-    [core]
-    executor = my_company.executors.MyCustomExecutor
-
 .. note::
     For more information on Airflow's configuration, see 
:doc:`/howto/set-config`.
 
@@ -51,7 +44,7 @@ If you want to check which executor is currently set, you can 
use the ``airflow
 Executor Types
 --------------
 
-There are two types of executor - those that run tasks *locally* (inside the 
``scheduler`` process), and those that run their tasks *remotely* (usually via 
a pool of *workers*). Airflow comes configured with the ``SequentialExecutor`` 
by default, which is a local executor, and the safest option for execution, but 
we *strongly recommend* you change this to ``LocalExecutor`` for small, 
single-machine installations, or one of the remote executors for a 
multi-machine/cloud installation.
+There are two types of executors - those that run tasks *locally* (inside the 
``scheduler`` process), and those that run their tasks *remotely* (usually via 
a pool of *workers*). Airflow comes configured with the ``SequentialExecutor`` 
by default, which is a local executor, and the simplest option for execution. 
However, the ``SequentialExecutor`` is not suitable for production since it 
does not allow for parallel task running and due to that, some Airflow features 
(e.g. running sensors) [...]
 
 
 **Local Executors**
@@ -78,3 +71,134 @@ There are two types of executor - those that run tasks 
*locally* (inside the ``s
 .. note::
 
     New Airflow users may assume they need to run a separate executor process 
using one of the Local or Remote Executors. This is not correct. The executor 
logic runs *inside* the scheduler process, and will run the tasks locally or 
not depending the executor selected.
+
+Writing Your Own Executor
+-------------------------
+
+All Airflow executors implement a common interface so that they are pluggable 
and any executor has access to all abilities and integrations within Airflow. 
Primarily, the Airflow scheduler uses this interface to interact with the 
executor, but other components such as logging, CLI and backfill do as well.
+The public interface is the 
:class:`~airflow.executors.base_executor.BaseExecutor`. You can look through 
the code for the most detailed and up to date interface, but some important 
highlights are outlined below.
+
+.. note::
+    For more information about Airflow's public interface see 
:doc:`/public-airflow-interface`.
+
+Some reasons you may want to write a custom executor include:
+
+* An executor does not exist which fits your specific use case, such as a 
specific tool or service for compute.
+* You'd like to use an executor that leverages a compute service from your 
preferred cloud provider.
+* You have a private tool/service for task execution that is only available to 
you or your organization.
+
+
+Important BaseExecutor Methods
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+These methods don't require overriding to implement your own executor, but are 
useful to be aware of:
+
+* ``heartbeat``: The Airflow scheduler Job loop will periodically call 
heartbeat on the executor. This is one of the main points of interaction 
between the Airflow scheduler and the executor. This method updates some 
metrics, triggers newly queued tasks to execute and updates state of 
running/completed tasks.
+* ``queue_command``: The Airflow Executor will call this method of the 
BaseExecutor to provide tasks to be run by the executor. The BaseExecutor 
simply adds the TaskInstances to an internal list of queued tasks within the 
executor.
+* ``get_event_buffer``: The Airflow scheduler calls this method to retrieve 
the current state of the TaskInstances the executor is executing.
+* ``has_task``: The scheduler uses this BaseExecutor method to determine if an 
executor already has a specific task instance queued or running.
+* ``send_callback``: Sends any callbacks to the sink configured on the 
executor.
+
+
+Mandatory Methods to Implement
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The following methods must be overridden at minimum to have your executor 
supported by Airflow:
+
+* ``sync``: Sync will get called periodically during executor heartbeats. 
Implement this method to update the state of the tasks which the executor knows 
about. Optionally, attempting to execute queued tasks that have been received 
from the scheduler.
+* ``execute_async``: Executes a command asynchronously. A command in this 
context is an Airflow CLI command to run an Airflow task. This method is called 
(after a few layers) during executor heartbeat which is run periodically by the 
scheduler. In practice, this method often just enqueues tasks into an internal 
or external queue of tasks to be run (e.g. ``KubernetesExecutor``). But can 
also execute the tasks directly as well (e.g. ``LocalExecutor``). This will 
depend on the executor.
+
+
+Optional Interface Methods to Implement
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The following methods aren't required to override to have a functional Airflow 
executor. However, some powerful capabilities and stability can come from 
implementing them:
+
+* ``start``: The Airflow scheduler (and backfill) job will call this method 
after it initializes the executor object. Any additional setup required by the 
executor can be completed here.
+* ``end``: The Airflow scheduler (and backfill) job will call this method as 
it is tearing down. Any synchronous cleanup required to finish running jobs 
should be done here.
+* ``terminate``: More forcefully stop the executor, even killing/stopping 
in-flight tasks instead of synchronously waiting for completion.
+* ``cleanup_stuck_queued_tasks``: If tasks are stuck in the queued state for 
longer than ``task_queued_timeout`` then they are collected by the scheduler 
and provided to the executor to have an opportunity to handle them (perform any 
graceful cleanup/teardown) via this method and return the Task Instances for a 
warning message displayed to users.
+* ``try_adopt_task_instances``: Tasks that have been abandoned (e.g. from a 
scheduler job that died) are provided to the executor to adopt or otherwise 
handle them via this method. Any tasks that cannot be adopted (by default the 
BaseExector assumes all cannot be adopted) should be returned.
+* ``get_cli_commands``: Executors may vend CLI commands to users by 
implementing this method, see the `CLI`_ section below for more details.
+* ``get_task_log``: Executors may vend log messages to Airflow task logs by 
implementing this method, see the `Logging`_ section below for more details.
+
+Compatibility Attributes
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+The ``BaseExecutor`` class interface contains a set of attributes that Airflow 
core code uses to check the features that your executor is compatible with. 
When writing your own Airflow executor be sure to set these correctly for your 
use case. Each attribute is simply a boolean to enable/disable a feature or 
indicate that a feature is supported/unsupported by the executor:
+
+* ``supports_pickling``: Whether or not the executor supports reading pickled 
DAGs from the Database before execution (rather than reading the DAG definition 
from the file system).
+* ``supports_sentry``: Whether or not the executor supports `Sentry 
<https://sentry.io>`_.
+
+* ``is_local``: Whether or not the executor is remote or local. See the 
`Executor Types`_ section above.
+* ``is_single_threaded``: Whether or not the executor is single threaded. This 
is particularly relevant to what database backends are supported. Single 
threaded executors can run with any backend, including SQLite.
+* ``is_production``: Whether or not the executor should be used for production 
purposes. A UI message is displayed to users when they are using a 
non-production ready executor.
+
+* ``change_sensor_mode_to_reschedule``: Running Airflow sensors in poke mode 
can block the thread of executors and in some cases Airflow.
+* ``serve_logs``: Whether or not the executor supports serving logs, see 
:doc:`/administration-and-deployment/logging-monitoring/logging-tasks`.
+
+CLI
+^^^
+
+Executors may vend CLI commands which will be included in the ``airflow`` 
command line tool by implementing the ``get_cli_commands`` method. Executors 
such as ``CeleryExecutor`` and ``KubernetesExecutor`` for example, make use of 
this mechanism. The commands can be used to setup required workers, initialize 
environment or set other configuration. Commands are only vended for the 
currently configured executor. A pseudo-code example of implementing CLI 
command vending from an executor can  [...]
+
+.. code-block:: python
+
+    @staticmethod
+    def get_cli_commands() -> list[GroupCommand]:
+        sub_commands = [
+            ActionCommand(
+                name="command_name",
+                help="Description of what this specific command does",
+                func=lazy_load_command("path.to.python.function.for.command"),
+                args=(),
+            ),
+        ]
+
+        return [
+            GroupCommand(
+                name="my_cool_executor",
+                help="Description of what this group of commands do",
+                subcommands=sub_commands,
+            ),
+        ]
+
+.. note::
+    Currently there are no strict rules in place for the Airflow command 
namespace. It is up to developers to use names for their CLI commands that are 
sufficiently unique so as to not cause conflicts with other Airflow executors 
or components.
+
+.. note::
+    When creating a new executor, or updating any existing executors, be sure 
to not import or execute any expensive operations/code at the module level. 
Executor classes are imported in several places and if they are slow to import 
this will negatively impact the performance of your Airflow environment, 
especially for CLI commands.
+
+Logging
+^^^^^^^
+
+Executors may vend log messages which will be included in the Airflow task 
logs by implementing the ``get_task_logs`` method. This can be helpful if the 
execution environment has extra context in the case of task failures, which may 
be due to the execution environment itself rather than the Airflow task code. 
It can also be helpful to include setup/teardown logging from the execution 
environment.
+The ``KubernetesExecutor`` leverages this this capability to include logs from 
the pod which ran a specific Airflow task and display them in the logs for that 
Airflow task. A pseudo-code example of implementing task log vending from an 
executor can be seen below:
+
+.. code-block:: python
+
+    def get_task_log(self, ti: TaskInstance, try_number: int) -> 
tuple[list[str], list[str]]:
+        messages = []
+        log = []
+        try:
+            res = helper_function_to_fetch_logs_from_execution_env(ti, 
try_number)
+            for line in res:
+                log.append(remove_escape_codes(line.decode()))
+            if log:
+                messages.append("Found logs from execution environment!")
+        except Exception as e:  # No exception should cause task logs to fail
+            messages.append(f"Failed to find logs from execution environment: 
{e}")
+        return messages, ["\n".join(log)]
+
+Next Steps
+^^^^^^^^^^
+
+Once you have created a new executor class implementing the ``BaseExecutor`` 
interface, you can configure Airflow to use it by setting the ``core.executor`` 
configuration value to the module path of your executor:
+
+.. code-block:: ini
+
+    [core]
+    executor = my_company.executors.MyCustomExecutor
+
+.. note::
+    For more information on Airflow's configuration, see 
:doc:`/howto/set-config` and for more information on managing Python modules in 
Airflow see :doc:`/administration-and-deployment/modules_management`.
diff --git a/docs/apache-airflow/public-airflow-interface.rst 
b/docs/apache-airflow/public-airflow-interface.rst
index 534cc0d0e6..eeeaa618e7 100644
--- a/docs/apache-airflow/public-airflow-interface.rst
+++ b/docs/apache-airflow/public-airflow-interface.rst
@@ -318,11 +318,11 @@ Executors
 
 Executors are the mechanism by which task instances get run. All executors are
 derived from :class:`~airflow.executors.base_executor.BaseExecutor`. There are 
several
-executor implementations built-in Airflow, each with its own unique 
characteristics and capabilities.
+executor implementations built-in Airflow, each with their own unique 
characteristics and capabilities.
 
-The executor interface itself (the BaseExecutor class) is public, but the 
built-in executors are not (i.e. KubernetesExecutor, LocalExecutor, etc).  This 
means that, to use KubernetesExecutor as an example, we may make changes to 
KubernetesExecutor in minor or patch Airflow releases which could break an 
executor that subclasses KubernetesExecutor.  This is necessary to allow 
Airflow developers sufficient freedom to continue to improve the executors we 
offer.  Accordingly if you want to m [...]
+The executor interface itself (the BaseExecutor class) is public, but the 
built-in executors are not (i.e. KubernetesExecutor, LocalExecutor, etc).  This 
means that, to use KubernetesExecutor as an example, we may make changes to 
KubernetesExecutor in minor or patch Airflow releases which could break an 
executor that subclasses KubernetesExecutor.  This is necessary to allow 
Airflow developers sufficient freedom to continue to improve the executors we 
offer.  Accordingly, if you want to  [...]
 
-You can read more about executors in :doc:`core-concepts/executor/index`.
+You can read more about executors and how to write your own in 
:doc:`core-concepts/executor/index`.
 
 .. versionadded:: 2.6
 

Reply via email to