This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 42aeb4b398c Documentation for ResumableJobMixin and resumable tasks
(#68136)
42aeb4b398c is described below
commit 42aeb4b398c71c2162055fb9f02b7f5554be41d8
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Jun 8 14:53:37 2026 +0530
Documentation for ResumableJobMixin and resumable tasks (#68136)
---
.../docs/authoring-and-scheduling/deferring.rst | 3 +
airflow-core/docs/core-concepts/index.rst | 1 +
.../docs/core-concepts/resumable-tasks.rst | 187 +++++++++++++++++++++
task-sdk/docs/deferred-vs-async-operators.rst | 9 +
task-sdk/docs/index.rst | 1 +
task-sdk/docs/resumable-job-mixin.rst | 167 ++++++++++++++++++
6 files changed, 368 insertions(+)
diff --git a/airflow-core/docs/authoring-and-scheduling/deferring.rst
b/airflow-core/docs/authoring-and-scheduling/deferring.rst
index 95c7c2b0b47..8a074e4d66e 100644
--- a/airflow-core/docs/authoring-and-scheduling/deferring.rst
+++ b/airflow-core/docs/authoring-and-scheduling/deferring.rst
@@ -34,6 +34,9 @@ This is where *Deferrable Operators* can be used. When it has
nothing to do but
For guidance on when to use deferred operators versus async tasks,
see `Deferred vs Async Operators
<https://airflow.apache.org/docs/task-sdk/stable/deferred-vs-async-operators.html>`__.
+ For guidance on when to use deferrable operators versus resumable tasks
+ (crash-safe synchronous operators that use the task state store), see
+ :ref:`Resumable Tasks <concepts-resumable-tasks>`.
An overview of how this process works:
diff --git a/airflow-core/docs/core-concepts/index.rst
b/airflow-core/docs/core-concepts/index.rst
index 5e53609d8d8..c3a9a8694f5 100644
--- a/airflow-core/docs/core-concepts/index.rst
+++ b/airflow-core/docs/core-concepts/index.rst
@@ -39,6 +39,7 @@ Here you can find detailed documentation about each one of
the core concepts of
operators
sensors
taskflow
+ resumable-tasks
executor/index
auth-manager/index
multi-team
diff --git a/airflow-core/docs/core-concepts/resumable-tasks.rst
b/airflow-core/docs/core-concepts/resumable-tasks.rst
new file mode 100644
index 00000000000..6371c0ba7c8
--- /dev/null
+++ b/airflow-core/docs/core-concepts/resumable-tasks.rst
@@ -0,0 +1,187 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _concepts-resumable-tasks:
+
+Resumable Tasks
+===============
+
+.. versionadded:: 3.3.0
+
+Many data engineering workflows involve submitting work to an external system
and waiting for it
+to complete. A Spark job, a BigQuery query, a Kubernetes batch pod, an EMR
step: these are all
+tasks where the real work happens outside Airflow, and the operator's job is
mostly to submit,
+poll, and collect the result.
+
+These tasks share a common failure mode. In classic operator cases, the worker
slot is held for the
+entire polling duration, and if the worker process is restarted or the host is
preempted, the task
+retries from scratch, losing all the progress made. Depending on the operator,
that means the external
+job is submitted again, creating a duplicate run in context of the external
system.
+
+Airflow recommends three approaches for handling long-running external work.
Understanding the trade-offs
+between them helps you choose the right one for your situation.
+
+.. _concepts-resumable-tasks-deferrable:
+
+Deferrable Operators
+--------------------
+
+A deferrable operator pauses itself at the point where it would otherwise
start polling, hands
+the polling work to the Triggerer component, and releases its worker slot.
When the external
+condition is met, the Triggerer wakes the task and the worker resumes from
where the operator
+left off.
+
+This is the most resource-efficient option. A single Triggerer process can
concurrently watch
+thousands of conditions, so the rest of the worker pool stays free for other
tasks.
+
+The trade-offs are:
+
+* A Triggerer component must be running. Deployments that do not include a
Triggerer cannot use this pattern.
+* Writing a custom deferrable operator requires implementing a dedicated
``Trigger`` class in
+ addition to the operator itself.
+* The polling logic runs inside the Triggerer's async event loop. Blocking
calls inside a
+ Trigger stall the entire Triggerer process.
+
+If a deferrable operator already exists for your use case, or your team is
comfortable
+implementing one, this is the recommended path considering its resource
efficiency.
+
+For more details, see :doc:`../authoring-and-scheduling/deferring`.
+
+.. _concepts-resumable-tasks-resumable:
+
+Resumable Tasks
+---------------
+
+A resumable task uses the task state store to persist a checkpoint before
+it would otherwise lose progress. On retry, the task reads that checkpoint and
continues from
+where it left off rather than starting over.
+
+The worker slot is held for the full duration of the task, the same as a
standard synchronous
+operator. The benefit is crash safety and continuity, not resource efficiency.
+
+Resumable tasks are useful when:
+
+* No deferrable operator exists for your external system and writing one is
not practical.
+* You want crash recovery without operating a Triggerer.
+* The task processes work incrementally (for example, reading files from a
list or paginating
+ through API results) and should be able to resume from its last completed
batch.
+
+**General pattern**
+
+The task reads a checkpoint from ``task_store`` at the start, does some work,
writes an updated
+checkpoint, and either continues or finishes. On the next run (whether due to
a retry after a
+crash or a deliberate reschedule), it reads the checkpoint again and picks up
from there.
+
+.. code-block:: python
+
+ from airflow.sdk import dag, task
+
+
+ @dag(schedule=None)
+ def process_files_dag():
+
+ @task(retries=5)
+ def process_files(context=None):
+ task_store = context["task_store"]
+ files = ["a.csv", "b.csv", "c.csv", "d.csv"]
+
+ last_processed = task_store.get("last_processed")
+ start_index = 0
+ if last_processed is not None:
+ start_index = files.index(last_processed) + 1
+
+ for file in files[start_index:]:
+ # ... process the file ...
+ task_store.set("last_processed", file)
+
+ process_files()
+
+
+ process_files_dag()
+
+This pattern works without any additional work, relying only on ``context``.
The state store is just
+a key-value store scoped to the task instance, and what you checkpoint is up
to you.
+
+**Resumable operators for external jobs**
+
+When the task submits a job to an external system and then polls for
completion, there is an
+additional problem: on retry, the task would submit a second job even though
the first one may
+still be running. Instead of having to handle this manually, the
:class:`~airflow.sdk.ResumableJobMixin`
+solves this by persisting the external job identifier before polling starts,
and reconnecting to the
+existing job on retry instead of submitting a new one.
+
+For more details and a working example, see
:class:`~airflow.sdk.ResumableJobMixin`.
+
+.. _concepts-resumable-tasks-async:
+
+Asynchronous Tasks
+------------------
+
+.. note::
+
+ Async task support applies to Python tasks only: ``@task`` decorated
``async def`` functions
+ and class-based operators that subclass ``BaseAsyncOperator``. It is not
available for
+ other operator types.
+
+Python tasks support ``async``/``await`` syntax. When the decorated callable
is an async
+function, Airflow runs it inside an event loop, which lets you fan out many
concurrent I/O
+operations (HTTP requests, database queries, file reads) within a single task
execution without
+blocking the event loop while waiting for each one.
+
+The worker slot is held for the full duration of the task. Async tasks are not
designed for
+long external waits or crash recovery; they are designed for high-throughput
I/O work that
+completes within a single execution.
+
+For guidance on when to use async tasks versus deferrable operators, see
+:doc:`task-sdk:deferred-vs-async-operators`.
+
+.. _concepts-resumable-tasks-comparison:
+
+Comparison
+----------
+
+.. list-table::
+ :header-rows: 1
+
+ * - Characteristic
+ - Deferrable operator
+ - Resumable task
+ - Async task
+ * - Worker slot during external wait
+ - Freed
+ - Held
+ - Held
+ * - Requires Triggerer
+ - Yes
+ - No
+ - No
+ * - Handles crash recovery
+ - Yes (via Triggerer)
+ - Yes (via task store checkpoint)
+ - No
+ * - Prevents duplicate job submission
+ - Depends on operator implementation
+ - Yes (with ``ResumableJobMixin``)
+ - Not applicable
+ * - Suitable for concurrent I/O fan-out
+ - No
+ - No
+ - Yes
+ * - Available from
+ - Airflow 2.2
+ - Airflow 3.3
+ - Airflow 3.2
diff --git a/task-sdk/docs/deferred-vs-async-operators.rst
b/task-sdk/docs/deferred-vs-async-operators.rst
index 4d77deea81d..466060ab3a4 100644
--- a/task-sdk/docs/deferred-vs-async-operators.rst
+++ b/task-sdk/docs/deferred-vs-async-operators.rst
@@ -25,6 +25,15 @@ Deferred vs Async Operators
Airflow 3.2 introduces Python-native async support for tasks, allowing
concurrent I/O within a single worker slot.
This page explains how async operators differ from deferred operators and when
to use each.
+.. versionchanged:: 3.3.0
+
+ Airflow 3.3 adds a third pattern for long-running tasks: resumable tasks,
which use the
+ task state store to checkpoint progress across retries. For guidance on
when to use
+ resumable tasks versus deferrable operators, see
+ :ref:`apache-airflow:concepts-resumable-tasks`. For details on the
+ :class:`~airflow.sdk.ResumableJobMixin` for external job crash recovery, see
+ :doc:`resumable-job-mixin`.
+
Deferred Operators
------------------
diff --git a/task-sdk/docs/index.rst b/task-sdk/docs/index.rst
index 7e3467c14ff..18e215d4ce7 100644
--- a/task-sdk/docs/index.rst
+++ b/task-sdk/docs/index.rst
@@ -173,6 +173,7 @@ For the full public API reference, see the :doc:`api` page.
examples
dynamic-task-mapping
deferred-vs-async-operators
+ resumable-job-mixin
api
concepts
executable-bundle-spec
diff --git a/task-sdk/docs/resumable-job-mixin.rst
b/task-sdk/docs/resumable-job-mixin.rst
new file mode 100644
index 00000000000..32da769bb6a
--- /dev/null
+++ b/task-sdk/docs/resumable-job-mixin.rst
@@ -0,0 +1,167 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _sdk-resumable-job-mixin:
+
+ResumableJobMixin
+=================
+
+.. versionadded:: 3.3.0
+
+:class:`~airflow.sdk.ResumableJobMixin` is a mixin for operators that submit
long-running jobs
+to an external system and poll for its completion. It makes the operator
crash-safe by persisting
+the external job identifier to task state store before polling begins. If the
worker is restarted
+or the host is preempted, the next retry reconnects to the already running job
instead of submitting
+a duplicate.
+
+This mixin is not a replacement for deferrable operators. Deferrable operators
free the
+worker slot during polling and are the recommended approach when a Triggerer
is available.
+Use this mixin when you want crash safety on an existing synchronous operator
without
+migrating to the deferrable pattern, or when your deployment does not include
a Triggerer.
+
+For guidance on choosing between deferrable, resumable, and async approaches,
see
+:ref:`apache-airflow:concepts-resumable-tasks`.
+
+Interface
+---------
+
+Subclasses must implement these six methods:
+
+.. code-block:: python
+
+ def submit_job(self, context: Context) -> JsonValue: ...
+ def get_job_status(self, external_id: JsonValue, context: Context) -> str:
...
+ def is_job_active(self, status: str) -> bool: ...
+ def is_job_succeeded(self, status: str) -> bool: ...
+ def poll_until_complete(self, external_id: JsonValue, context: Context) ->
None: ...
+ def get_job_result(self, external_id: JsonValue, context: Context) -> Any:
...
+
+.. _sdk-resumable-job-mixin-implementing:
+
+Implementing the mixin
+----------------------
+
+Add inheritance to :class:`~airflow.sdk.ResumableJobMixin` in your operator
class, then call
+``execute_resumable(context)`` from your ``execute`` method. The mixin
requires you to
+implement six methods that describe how to interact with your external system:
+
+``submit_job(context)``
+ Submit the job and return its external identifier. The returned value is
stored in
+ ``task_store`` and passed back to the other methods on retry. Return
``None`` only if
+ the external system does not provide a trackable identifier; in that case
the mixin
+ cannot provide crash safety and will resubmit on every retry.
+
+``get_job_status(external_id, context)``
+ Query the external system for the current job status. Return a raw string
from
+ the external system. This method is called on retry to determine whether
the job
+ is still running, succeeded, or failed.
+
+``is_job_active(status)``
+ Return ``True`` if the job is still running and can be reconnected to.
``status`` is the
+ raw string returned by ``get_job_status``, a backend-specific value from
the external
+ system, not an Airflow state::
+
+ def is_job_active(self, status: str) -> bool:
+ return status in ("RUNNING", "PENDING", "ACCEPTED")
+
+``is_job_succeeded(status)``
+ Return ``True`` if the job completed successfully. ``status`` is the same
raw string
+ from the external system::
+
+ def is_job_succeeded(self, status: str) -> bool:
+ return status == "SUCCEEDED"
+
+``poll_until_complete(external_id, context)``
+ Block until the job reaches a terminal state. Raise on failure.
+
+``get_job_result(external_id, context)``
+ Return the job result after successful completion. Return ``None`` if not
applicable.
+
+How it works
+------------
+
+On the first run, after ``submit_job`` returns the external identifier, the
mixin persists
+that identifier to ``task_store`` before calling ``poll_until_complete``. If
the worker
+crashes during polling, the next retry reads the stored identifier and calls
``get_job_status``
+to check the current state of the job:
+
+* If the job is still running, the mixin calls ``poll_until_complete`` to
reconnect and continue
+ waiting.
+* If the job already completed successfully, the mixin calls
``get_job_result`` and returns
+ immediately without resubmitting.
+* If the job is in a terminal failure state, the mixin falls through and
submits a fresh job.
+
+.. note::
+
+ There is a small window between ``submit_job`` returning and
``task_store.set`` completing.
+ If the worker crashes in that gap, the next retry does not have the
identifier and will
+ submit a fresh job. For most workloads this window is negligible.
+
+Example
+-------
+
+.. code-block:: python
+
+ from airflow.sdk import BaseOperator, ResumableJobMixin
+ from pydantic import JsonValue
+
+
+ class MyBatchOperator(BaseOperator, ResumableJobMixin):
+
+ external_id_key = "batch_job_id"
+
+ def execute(self, context):
+ return self.execute_resumable(context)
+
+ def submit_job(self, context) -> JsonValue:
+ return self.hook.submit_batch(...)
+
+ def get_job_status(self, external_id: JsonValue, context) -> str:
+ return self.hook.get_status(external_id)
+
+ def is_job_active(self, status: str) -> bool:
+ return status in ("RUNNING", "PENDING", "QUEUED")
+
+ def is_job_succeeded(self, status: str) -> bool:
+ return status == "SUCCEEDED"
+
+ def poll_until_complete(self, external_id: JsonValue, context) -> None:
+ self.hook.wait(external_id)
+
+ def get_job_result(self, external_id: JsonValue, context):
+ return None
+
+.. _sdk-resumable-job-mixin-external-id-key:
+
+External ID key
+---------------
+
+The ``external_id_key`` class attribute controls which key is used to store
the job identifier
+in ``task_store``. The default value is ``"remote_job_id"``. You can override
it on your
+subclass to use a more descriptive name:
+
+.. code-block:: python
+
+ class MyBatchOperator(ResumableJobMixin, BaseOperator):
+ external_id_key = "batch_job_id"
+
+.. warning::
+
+ Do not rename ``external_id_key`` on an operator that is already deployed
and has
+ in-flight task instances. The old key is already stored in the task state
store under the
+ previous name. A rename causes the mixin to treat every active retry as a
fresh submission,
+ defeating the crash-safety guarantee.